Skip to Content

Real-time Streaming

TL;DR

Stream terminal output in real-time using client.terminal.stream() with event handlers for output, disconnect, reconnect, and errors. Supports sync and async output handlers, pattern detection, buffered processing for high-volume output, multi-machine streaming, WebSocket forwarding (FastAPI), AI agent event streaming, heartbeat monitoring, and backpressure handling.

Stream terminal output and events in real-time.

How do I stream terminal output?

from cmdop import AsyncCMDOPClient # Connect to the remote CMDOP server async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: # Get the active terminal session for the target machine session = await client.terminal.get_active_session("my-server") # Create a new stream instance stream = client.terminal.stream() # Register a callback to print output as it arrives stream.on_output(lambda data: print(data.decode(), end="")) # Attach the stream to the terminal session await stream.attach(session.session_id) # Send a command to the remote terminal await stream.send_input(b"tail -f /var/log/app.log\n") # Keep the connection open to receive output for 60 seconds await asyncio.sleep(60)

What event handlers are available?

How do I handle output events?

# Synchronous output handler: receives raw bytes from the terminal def on_output(data: bytes): text = data.decode() print(text, end="", flush=True) # Register the handler to receive terminal output stream.on_output(on_output)

How do I use async output handlers?

# Async output handler: use for I/O-bound operations like DB writes async def on_output_async(data: bytes): await save_to_database(data) # Persist output to database await send_to_websocket(data) # Forward to connected WebSocket clients # Register the async handler (runs in the event loop) stream.on_output_async(on_output_async)

How do I handle disconnects?

# Called when the stream connection is lost def on_disconnect(): print("Connection lost") # Handle reconnection logic here # Register the disconnect handler stream.on_disconnect(on_disconnect)

How do I handle reconnects?

# Called when the stream successfully reconnects after a disconnect def on_reconnect(): print("Reconnected") # Register the reconnect handler stream.on_reconnect(on_reconnect)

How do I handle stream errors?

# Called when a stream error occurs (e.g. network failure, protocol error) def on_error(error: Exception): print(f"Error: {error}") log_error(error) # Register the error handler stream.on_error(on_error)

How do I collect output into a buffer?

# Accumulate all output chunks into a list buffer = [] def collect(data: bytes): buffer.append(data) # Register the collector as the output handler stream.on_output(collect) # Send a command and wait for output to arrive await stream.send_input(b"ls -la\n") await asyncio.sleep(2) # Join all chunks into a single decoded string full_output = b"".join(buffer).decode()

How do I wait for a specific pattern?

# Wait until a specific text pattern appears in the stream output async def wait_for(stream, pattern: str, timeout: float = 30): found = asyncio.Event() # Event to signal when pattern is found buffer = [] # Accumulate output chunks def check(data: bytes): buffer.append(data) # Check if the pattern exists in the combined output so far if pattern.encode() in b"".join(buffer): found.set() # Signal that the pattern was found stream.on_output(check) # Wait for the pattern or raise TimeoutError after `timeout` seconds await asyncio.wait_for(found.wait(), timeout=timeout) return b"".join(buffer).decode() # Usage: block until "Build complete" appears in the output output = await wait_for(stream, "Build complete")

How do I detect patterns in output?

import re # Classify stream output using regex pattern matching class PatternDetector: def __init__(self): # Define named patterns to detect in output self.patterns = { "error": re.compile(r"ERROR|FATAL|Exception"), "warning": re.compile(r"WARN|WARNING"), "success": re.compile(r"SUCCESS|COMPLETE|DONE") } self.callbacks = {} # Map pattern names to handler functions def on_pattern(self, pattern_name: str, callback): """Register a callback for a specific pattern name.""" self.callbacks[pattern_name] = callback def process(self, data: bytes): """Check incoming data against all registered patterns.""" text = data.decode() for name, pattern in self.patterns.items(): if pattern.search(text): if name in self.callbacks: self.callbacks[name](text) # Create detector and register an alert for errors detector = PatternDetector() detector.on_pattern("error", lambda text: send_alert(text)) # Pipe all stream output through the pattern detector stream.on_output(detector.process)

How do I handle high-volume output?

For high-volume output:

from collections import deque # Buffer incoming data and process in batches at a fixed interval class BufferedHandler: def __init__(self, flush_interval: float = 0.1): self.buffer = deque() # Thread-safe double-ended queue self.flush_interval = flush_interval # Seconds between batch flushes def add(self, data: bytes): """Append data to the buffer (called by the stream).""" self.buffer.append(data) async def run(self): """Continuously flush buffered data at the configured interval.""" while True: if self.buffer: batch = list(self.buffer) # Snapshot the current buffer self.buffer.clear() # Clear for the next interval await self.process_batch(batch) await asyncio.sleep(self.flush_interval) async def process_batch(self, batch: list[bytes]): """Process a batch of output chunks at once.""" combined = b"".join(batch) print(combined.decode(), end="") # Wire the buffered handler into the stream handler = BufferedHandler() stream.on_output(handler.add) asyncio.create_task(handler.run()) # Start the flush loop in the background

How do I stream from multiple machines?

# Stream output from multiple machines simultaneously servers = ["web-1", "web-2", "web-3"] streams = [] for server in servers: # Get each server's active session and create a stream session = await client.terminal.get_active_session(server) stream = client.terminal.stream() # Prefix each line with the server name for identification stream.on_output(lambda data, s=server: print(f"[{s}] {data.decode()}", end="")) # Attach the stream to the session await stream.attach(session.session_id) streams.append(stream) # All streams are now active and receiving output concurrently await asyncio.sleep(60) # Cleanup: close all streams when done for stream in streams: await stream.close()

How do I forward to a WebSocket?

from fastapi import WebSocket # Bridge a terminal session to a WebSocket client (e.g. browser terminal) async def stream_to_ws(websocket: WebSocket, hostname: str): async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: session = await client.terminal.get_active_session(hostname) stream = client.terminal.stream() # Forward terminal output to the WebSocket client async def forward(data: bytes): await websocket.send_bytes(data) stream.on_output_async(forward) await stream.attach(session.session_id) # Receive user input from the WebSocket and send to the terminal while True: data = await websocket.receive_bytes() await stream.send_input(data)

How do I stream AI agent events?

# Iterate over AI agent events as they are generated async for event in client.agent.run_stream( prompt="Analyze logs", output_schema=Analysis ): match event.type: case "thinking": # Agent is reasoning about the task print(f"Thinking: {event.content}") case "tool_call": # Agent is invoking a tool (e.g. shell command, API call) print(f"Using tool: {event.tool}") print(f"Args: {event.args}") case "tool_result": # Tool execution completed, showing truncated result print(f"Tool output: {event.result[:100]}...") case "result": # Final structured result matching the output schema analysis: Analysis = event.output print(f"Final result: {analysis}") case "error": # An error occurred during agent execution print(f"Error: {event.error}")

How do I monitor session health?

# Monitor a session for idle timeout and send a health-check ping async def monitor_session(session_id: str): last_activity = time.time() # Track the last time output was received def on_output(data): nonlocal last_activity last_activity = time.time() # Reset the idle timer on every output # Process output... stream.on_output(on_output) await stream.attach(session_id) while True: await asyncio.sleep(30) # Check every 30 seconds idle_time = time.time() - last_activity if idle_time > 300: # 5 minutes with no output print("Session idle, checking health...") await stream.send_input(b"echo ping\n") # Send a keepalive command

How do I handle backpressure?

# Rate-limit processing when output arrives faster than it can be consumed class BackpressureHandler: def __init__(self, max_queue: int = 1000): self.queue = asyncio.Queue(maxsize=max_queue) # Bounded queue def add(self, data: bytes): """Enqueue data; drop if the queue is full (backpressure).""" try: self.queue.put_nowait(data) except asyncio.QueueFull: # Queue overflow: oldest data is preserved, newest is dropped print("Warning: Queue full, dropping data") async def consume(self): """Continuously dequeue and process data.""" while True: data = await self.queue.get() # Block until data is available await self.process(data) self.queue.task_done() # Mark the item as processed async def process(self, data: bytes): """Process a single chunk at a controlled rate.""" await asyncio.sleep(0.01) # Throttle processing speed print(data.decode(), end="")

How do I log stream output?

import logging # Wrap stream output in Python's logging framework class StreamLogger: def __init__(self, hostname: str): # Create a namespaced logger for each machine self.logger = logging.getLogger(f"terminal.{hostname}") self.logger.setLevel(logging.DEBUG) def log_output(self, data: bytes): """Log each output chunk as a DEBUG message.""" self.logger.debug(data.decode().strip()) # Create a logger for the target host and register it logger = StreamLogger("my-server") stream.on_output(logger.log_output)

What are the streaming best practices?

1. Flush Output

def on_output(data): # Always flush to ensure output appears immediately in the terminal print(data.decode(), end="", flush=True) # flush=True!

2. Use Async for I/O

# Good: async handler won't block the event loop during I/O async def on_output(data): await save_to_db(data) # Avoid: sync handler blocks the event loop until the write completes def on_output(data): save_to_db_sync(data) # Blocks!

3. Handle Disconnects

# Always register both disconnect and reconnect handlers stream.on_disconnect(handle_disconnect) stream.on_reconnect(handle_reconnect)

4. Clean Up

try: await stream.attach(session.session_id) # Work... finally: # Always close the stream to release resources await stream.close()

Next

Last updated on