Real-time Streaming
TL;DR
Stream terminal output in real-time using client.terminal.stream() with event handlers
for output, disconnect, reconnect, and errors. Supports sync and async output handlers,
pattern detection, buffered processing for high-volume output, multi-machine streaming,
WebSocket forwarding (FastAPI), AI agent event streaming, heartbeat monitoring, and
backpressure handling.
Stream terminal output and events in real-time.
How do I stream terminal output?
from cmdop import AsyncCMDOPClient
# Connect to the remote CMDOP server
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
# Get the active terminal session for the target machine
session = await client.terminal.get_active_session("my-server")
# Create a new stream instance
stream = client.terminal.stream()
# Register a callback to print output as it arrives
stream.on_output(lambda data: print(data.decode(), end=""))
# Attach the stream to the terminal session
await stream.attach(session.session_id)
# Send a command to the remote terminal
await stream.send_input(b"tail -f /var/log/app.log\n")
# Keep the connection open to receive output for 60 seconds
await asyncio.sleep(60)What event handlers are available?
How do I handle output events?
# Synchronous output handler: receives raw bytes from the terminal
def on_output(data: bytes):
text = data.decode()
print(text, end="", flush=True)
# Register the handler to receive terminal output
stream.on_output(on_output)How do I use async output handlers?
# Async output handler: use for I/O-bound operations like DB writes
async def on_output_async(data: bytes):
await save_to_database(data) # Persist output to database
await send_to_websocket(data) # Forward to connected WebSocket clients
# Register the async handler (runs in the event loop)
stream.on_output_async(on_output_async)How do I handle disconnects?
# Called when the stream connection is lost
def on_disconnect():
print("Connection lost")
# Handle reconnection logic here
# Register the disconnect handler
stream.on_disconnect(on_disconnect)How do I handle reconnects?
# Called when the stream successfully reconnects after a disconnect
def on_reconnect():
print("Reconnected")
# Register the reconnect handler
stream.on_reconnect(on_reconnect)How do I handle stream errors?
# Called when a stream error occurs (e.g. network failure, protocol error)
def on_error(error: Exception):
print(f"Error: {error}")
log_error(error)
# Register the error handler
stream.on_error(on_error)How do I collect output into a buffer?
# Accumulate all output chunks into a list
buffer = []
def collect(data: bytes):
buffer.append(data)
# Register the collector as the output handler
stream.on_output(collect)
# Send a command and wait for output to arrive
await stream.send_input(b"ls -la\n")
await asyncio.sleep(2)
# Join all chunks into a single decoded string
full_output = b"".join(buffer).decode()How do I wait for a specific pattern?
# Wait until a specific text pattern appears in the stream output
async def wait_for(stream, pattern: str, timeout: float = 30):
found = asyncio.Event() # Event to signal when pattern is found
buffer = [] # Accumulate output chunks
def check(data: bytes):
buffer.append(data)
# Check if the pattern exists in the combined output so far
if pattern.encode() in b"".join(buffer):
found.set() # Signal that the pattern was found
stream.on_output(check)
# Wait for the pattern or raise TimeoutError after `timeout` seconds
await asyncio.wait_for(found.wait(), timeout=timeout)
return b"".join(buffer).decode()
# Usage: block until "Build complete" appears in the output
output = await wait_for(stream, "Build complete")How do I detect patterns in output?
import re
# Classify stream output using regex pattern matching
class PatternDetector:
def __init__(self):
# Define named patterns to detect in output
self.patterns = {
"error": re.compile(r"ERROR|FATAL|Exception"),
"warning": re.compile(r"WARN|WARNING"),
"success": re.compile(r"SUCCESS|COMPLETE|DONE")
}
self.callbacks = {} # Map pattern names to handler functions
def on_pattern(self, pattern_name: str, callback):
"""Register a callback for a specific pattern name."""
self.callbacks[pattern_name] = callback
def process(self, data: bytes):
"""Check incoming data against all registered patterns."""
text = data.decode()
for name, pattern in self.patterns.items():
if pattern.search(text):
if name in self.callbacks:
self.callbacks[name](text)
# Create detector and register an alert for errors
detector = PatternDetector()
detector.on_pattern("error", lambda text: send_alert(text))
# Pipe all stream output through the pattern detector
stream.on_output(detector.process)How do I handle high-volume output?
For high-volume output:
from collections import deque
# Buffer incoming data and process in batches at a fixed interval
class BufferedHandler:
def __init__(self, flush_interval: float = 0.1):
self.buffer = deque() # Thread-safe double-ended queue
self.flush_interval = flush_interval # Seconds between batch flushes
def add(self, data: bytes):
"""Append data to the buffer (called by the stream)."""
self.buffer.append(data)
async def run(self):
"""Continuously flush buffered data at the configured interval."""
while True:
if self.buffer:
batch = list(self.buffer) # Snapshot the current buffer
self.buffer.clear() # Clear for the next interval
await self.process_batch(batch)
await asyncio.sleep(self.flush_interval)
async def process_batch(self, batch: list[bytes]):
"""Process a batch of output chunks at once."""
combined = b"".join(batch)
print(combined.decode(), end="")
# Wire the buffered handler into the stream
handler = BufferedHandler()
stream.on_output(handler.add)
asyncio.create_task(handler.run()) # Start the flush loop in the backgroundHow do I stream from multiple machines?
# Stream output from multiple machines simultaneously
servers = ["web-1", "web-2", "web-3"]
streams = []
for server in servers:
# Get each server's active session and create a stream
session = await client.terminal.get_active_session(server)
stream = client.terminal.stream()
# Prefix each line with the server name for identification
stream.on_output(lambda data, s=server: print(f"[{s}] {data.decode()}", end=""))
# Attach the stream to the session
await stream.attach(session.session_id)
streams.append(stream)
# All streams are now active and receiving output concurrently
await asyncio.sleep(60)
# Cleanup: close all streams when done
for stream in streams:
await stream.close()How do I forward to a WebSocket?
from fastapi import WebSocket
# Bridge a terminal session to a WebSocket client (e.g. browser terminal)
async def stream_to_ws(websocket: WebSocket, hostname: str):
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
session = await client.terminal.get_active_session(hostname)
stream = client.terminal.stream()
# Forward terminal output to the WebSocket client
async def forward(data: bytes):
await websocket.send_bytes(data)
stream.on_output_async(forward)
await stream.attach(session.session_id)
# Receive user input from the WebSocket and send to the terminal
while True:
data = await websocket.receive_bytes()
await stream.send_input(data)How do I stream AI agent events?
# Iterate over AI agent events as they are generated
async for event in client.agent.run_stream(
prompt="Analyze logs",
output_schema=Analysis
):
match event.type:
case "thinking":
# Agent is reasoning about the task
print(f"Thinking: {event.content}")
case "tool_call":
# Agent is invoking a tool (e.g. shell command, API call)
print(f"Using tool: {event.tool}")
print(f"Args: {event.args}")
case "tool_result":
# Tool execution completed, showing truncated result
print(f"Tool output: {event.result[:100]}...")
case "result":
# Final structured result matching the output schema
analysis: Analysis = event.output
print(f"Final result: {analysis}")
case "error":
# An error occurred during agent execution
print(f"Error: {event.error}")How do I monitor session health?
# Monitor a session for idle timeout and send a health-check ping
async def monitor_session(session_id: str):
last_activity = time.time() # Track the last time output was received
def on_output(data):
nonlocal last_activity
last_activity = time.time() # Reset the idle timer on every output
# Process output...
stream.on_output(on_output)
await stream.attach(session_id)
while True:
await asyncio.sleep(30) # Check every 30 seconds
idle_time = time.time() - last_activity
if idle_time > 300: # 5 minutes with no output
print("Session idle, checking health...")
await stream.send_input(b"echo ping\n") # Send a keepalive commandHow do I handle backpressure?
# Rate-limit processing when output arrives faster than it can be consumed
class BackpressureHandler:
def __init__(self, max_queue: int = 1000):
self.queue = asyncio.Queue(maxsize=max_queue) # Bounded queue
def add(self, data: bytes):
"""Enqueue data; drop if the queue is full (backpressure)."""
try:
self.queue.put_nowait(data)
except asyncio.QueueFull:
# Queue overflow: oldest data is preserved, newest is dropped
print("Warning: Queue full, dropping data")
async def consume(self):
"""Continuously dequeue and process data."""
while True:
data = await self.queue.get() # Block until data is available
await self.process(data)
self.queue.task_done() # Mark the item as processed
async def process(self, data: bytes):
"""Process a single chunk at a controlled rate."""
await asyncio.sleep(0.01) # Throttle processing speed
print(data.decode(), end="")How do I log stream output?
import logging
# Wrap stream output in Python's logging framework
class StreamLogger:
def __init__(self, hostname: str):
# Create a namespaced logger for each machine
self.logger = logging.getLogger(f"terminal.{hostname}")
self.logger.setLevel(logging.DEBUG)
def log_output(self, data: bytes):
"""Log each output chunk as a DEBUG message."""
self.logger.debug(data.decode().strip())
# Create a logger for the target host and register it
logger = StreamLogger("my-server")
stream.on_output(logger.log_output)What are the streaming best practices?
1. Flush Output
def on_output(data):
# Always flush to ensure output appears immediately in the terminal
print(data.decode(), end="", flush=True) # flush=True!2. Use Async for I/O
# Good: async handler won't block the event loop during I/O
async def on_output(data):
await save_to_db(data)
# Avoid: sync handler blocks the event loop until the write completes
def on_output(data):
save_to_db_sync(data) # Blocks!3. Handle Disconnects
# Always register both disconnect and reconnect handlers
stream.on_disconnect(handle_disconnect)
stream.on_reconnect(handle_reconnect)4. Clean Up
try:
await stream.attach(session.session_id)
# Work...
finally:
# Always close the stream to release resources
await stream.close()Next
- Terminal Service — Terminal operations
- Guides: Streaming — Streaming patterns
Last updated on