streaming/reconnect

Wire live turns into a host UI, resume browsers with session cursors, and carry Lash activity across service boundaries without adding a second protocol.

Choose The Surface

Lash exposes three related streaming surfaces. Pick the smallest one that matches the boundary you are crossing.

The runnable browser examples use this split directly: turn-local streams are convenient inside the process that started a turn, while browser reconnect is always session-level observation with an opaque cursor. App/product rows and Lash observation rows can share one NDJSON or SSE response, but they remain separate contracts.

one active turn

Use TurnBuilder::stream_to(&sink) or pull-style TurnBuilder::stream() when caller and runtime are in the same process and the UI only needs events from the turn it just started.

session reconnect

Use LashSession::observe() and ObservableSession when a browser, worker, or service needs a reconnectable stream for the whole session. The cursor is session-level and opaque.

remote boundary

Use lash::remote DTOs, or lash_remote_protocol with core-conversions, when activity crosses HTTP, queues, callbacks, or another process.

Turn-Local Streams

Turn streams emit TurnActivity for one running turn. They are not durable history and they are not the reconnect cursor.

async fn stream_one_turn(
    session: &LashSession,
    sink: &dyn TurnActivitySink,
) -> lash::Result<lash::TurnResult> {
    session
        .turn(TurnInput::text("Summarize the incident."))
        .stream_to(sink)
        .await
}

async fn pull_one_turn(session: &LashSession) -> anyhow::Result<lash::TurnResult> {
    use futures_util::StreamExt as _;

    let mut stream = session
        .turn(TurnInput::text("Summarize the incident."))
        .stream()?;

    while let Some(activity) = stream.next().await {
        render_activity(activity?).await?;
    }

    Ok(stream.finish().await?)
}

Use stream_to when you already have a sink object, stream when you want a pull-style futures_util::Stream, and run when you only need the collected TurnOutput. TurnStream implements Stream<Item = Result<TurnActivity>>; next_activity() is a compatibility wrapper. stream_to awaits the sink's emit(). If a UI transport can block, push into a bounded channel and drain it from a separate task; the channel bound becomes your backpressure policy.

Session Reconnect

ObservableSession is the browser reconnect surface. It returns the current read view plus an opaque cursor, replays recent session observation events after a cursor, or reports a gap with a fresh observation. Persistent transports should use subscribe_and_recover; resume_from_cursor remains the one-shot poll alternative.

use futures_util::StreamExt as _;
use lash::observe::SessionObservationStreamItem;

async fn reconnect_session(
    session: &LashSession,
    stored_cursor: Option<lash::observe::SessionCursor>,
) -> anyhow::Result<lash::observe::SessionCursor> {
    let observable = session.observe();

    let mut cursor = match stored_cursor {
        Some(cursor) => cursor,
        None => {
            let observation = observable.current_observation();
            replace_from_read_view(&observation.read_view).await?;
            observation.cursor
        }
    };

    persist_cursor(&cursor).await?;

    let mut live = observable.subscribe_and_recover(cursor.clone());
    while let Some(item) = live.next().await {
        match item? {
            SessionObservationStreamItem::Event(event) => {
                cursor = event.cursor.clone();
                fold_session_event(event).await?;
            }
            SessionObservationStreamItem::Gap { observation, gap } => {
                replace_from_read_view(&observation.read_view).await?;
                cursor = gap.latest_cursor;
            }
        }
        persist_cursor(&cursor).await?;
    }

    Ok(cursor)
}

Frontend Folding

The frontend should fold live activity optimistically, then refresh settled state when the runtime commits.

match event.payload {
    SessionObservationEventPayload::TurnActivity(activity) => {
        render_activity(activity).await?;
    }
    SessionObservationEventPayload::Committed { read_view } => {
        append_committed_view(&read_view).await?;
    }
    SessionObservationEventPayload::AgentFrameSwitched { frame_id } => {
        update_frame(&frame_id).await?;
    }
    SessionObservationEventPayload::QueueChanged { batch_ids, .. } => {
        // Queue events cover both pending user input ids and non-user
        // queued-work batch ids; refresh the surfaces separately.
        refetch_pending_turn_inputs(&batch_ids).await?;
        refetch_queued_work_summaries(&batch_ids).await?;
    }
    SessionObservationEventPayload::ProcessChanged { process_ids, .. } => {
        refetch_process_summaries(&process_ids).await?;
    }
}
row identity

Use TurnActivity.correlation_id for stable rows. Tool start/complete, code start/complete, reasoning, and terminal values should update the same logical UI row when they share a correlation id.

sequence

RemoteTurnActivity.sequence is per-stream ordering only. It is useful inside one NDJSON or SSE response, but it is not durable and it is not a reconnect token.

normal prose

Normal assistant text streams as AssistantProseDelta and has no final-message event. The settled transcript comes from TurnResult or the session read view after commit.

committed

Committed means live assumptions are now settled. Refresh or reconcile from the read view instead of treating prior live deltas as authoritative durable history.

process and queue

ProcessChanged and QueueChanged are invalidation signals carrying ids. They are not full status payloads; refetch process or queue summaries through the existing admin APIs.

Gap Recovery

Live replay is bounded. A stale, trimmed, or otherwise unavailable cursor yields a recoverable gap instead of making the client guess what happened.

discard

Discard assumptions from missed live events. Do not try to infer what happened from partial stream state.

refresh

In-process callers receive SessionObservationStreamItem::Gap { observation, gap } and can replace UI state from observation.read_view. Remote callers receive RemoteSessionObservationStreamItem::Gap { observation, gap }; use the RemoteSessionObservation cursor/usage snapshot and refresh any app-owned product projection because the remote DTO intentionally does not serialize SessionReadView.

continue

Store gap.latest_cursor and keep folding new events. subscribe_and_recover and subscribe_and_recover_remote perform the resubscribe loop internally after yielding the gap item.

remote gap

RemoteLiveReplayGap carries session_id, requested_cursor, latest_cursor, latest_revision, and reason (trimmed or unavailable). Treat it as a synchronization marker, not as durable transcript data.

Remote Boundaries

Lash provides semantic DTOs. The host owns HTTP routes, auth, tenancy, retention, and wire framing.

NDJSON is the recommended framing for activity streams because each RemoteTurnActivity or RemoteSessionObservationEvent remains ordinary JSON and newline boundaries are easy to proxy. SSE is also acceptable when the browser integration benefits from native EventSource reconnect; use the same DTOs as event data.

A common browser endpoint sends host-defined product rows such as message, error, and done beside Lash rows such as replay_cursor, observation, and replay_gap. Product rows are owned by the app. The Lash rows should use RemoteSessionObservationEvent, RemoteSessionObservation, and RemoteLiveReplayGap so cursors, revisions, event payloads, and gap snapshots keep the same remote contract across HTTP, queues, and workflow callbacks.

use lash_remote_protocol::RemoteTurnActivitySink;

async fn stream_turn_as_ndjson(session: &LashSession) -> anyhow::Result<Vec<u8>> {
    let sink = RemoteTurnActivitySink::new(Vec::<u8>::new(), 0);

    session
        .turn(TurnInput::text("Summarize the incident."))
        .stream_to(&sink)
        .await?;

    let errors = sink.take_errors();
    if !errors.is_empty() {
        anyhow::bail!("remote stream write failed: {}", errors.join("; "));
    }

    sink.into_inner()
        .map_err(|_| anyhow::anyhow!("remote stream writer still borrowed"))
}

For session observation streams, use the DTO recovery helper at your service edge. It assigns per-response event sequence values, returns remote gap snapshots, and keeps clients on opaque persisted cursors.

use lash::observe::RemoteSessionObservationStreamItem;

async fn stream_remote_session_observations(
    session: &LashSession,
    stored_cursor: Option<lash::remote::observations::RemoteSessionCursor>,
) -> anyhow::Result<lash::remote::observations::RemoteSessionCursor> {
    let observable = session.observe();
    let mut cursor = match stored_cursor {
        Some(cursor) => cursor,
        None => {
            let observation = observable.current_remote_observation();
            replace_from_remote_observation(&observation).await?;
            lash::remote::observations::RemoteSessionCursor::new(observation.cursor)
        }
    };

    persist_remote_cursor(&cursor).await?;

    let mut live = observable.subscribe_and_recover_remote(cursor.clone())?;
    while let Some(item) = live.next().await {
        match item? {
            RemoteSessionObservationStreamItem::Event(event) => {
                cursor = lash::remote::observations::RemoteSessionCursor::new(event.cursor.clone());
                send_remote_session_line(serde_json::to_string(&event)?).await?;
            }
            RemoteSessionObservationStreamItem::Gap { observation, gap } => {
                cursor =
                    lash::remote::observations::RemoteSessionCursor::new(gap.latest_cursor.clone());
                replace_from_remote_observation(&observation).await?;
                send_remote_session_line(serde_json::to_string(&gap)?).await?;
            }
        }
        persist_remote_cursor(&cursor).await?;
    }

    Ok(cursor)
}

Where Next

This page owns live UI and reconnect wiring. Use the turns guide for event taxonomy and the remote protocol guide for DTO validation rules.

read on ·