Skip to Content

Real-Time Streaming

TL;DR

Stream command output in real-time using on_output callbacks instead of waiting for completion. Supports sync and async handlers, pattern detection for alerts, stdout/stderr separation, and binary output. Use buffered streaming for high-volume output, and FlowControlledHandler for backpressure. Forward streams to WebSockets, databases, or multiple handlers simultaneously.

Process command output as it arrives, not after completion.

Why use streaming instead of waiting for output?

Without streaming: $ ./build.sh [wait 5 minutes...] [get all output at once] With streaming: $ ./build.sh [Step 1/5] Installing... [Step 2/5] Compiling... <- see immediately [Step 3/5] Testing... ...

How do I set up basic output streaming?

from cmdop import AsyncCMDOPClient async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: session = await client.terminal.get_active_session("my-server") stream = client.terminal.stream() # Define a callback that fires for each chunk of output received def on_output(data: bytes): # Decode bytes to string; flush=True ensures immediate display print(data.decode(), end="", flush=True) # Register the callback to receive real-time output stream.on_output(on_output) # Attach to the session and start a long-running build await stream.attach(session.session_id) await stream.send_input(b"./long_build.sh\n") # Keep the connection open to continue receiving output (5 minutes) await asyncio.sleep(300)

How do I use async output handlers?

async def handle_output(data: bytes): text = data.decode() # Perform async I/O operations like database writes and WebSocket sends await save_to_database(text) await send_to_websocket(text) # Detect error patterns in real-time and trigger alerts if "error" in text.lower(): await send_alert(text) # Use on_output_async for handlers that need to await async operations stream.on_output_async(handle_output)

How do I collect all output into a buffer?

# Initialize a list to accumulate output chunks as they arrive output_buffer = [] def collect(data: bytes): # Append each raw byte chunk to the buffer output_buffer.append(data) stream.on_output(collect) await stream.attach(session.session_id) await stream.send_input(b"ls -la\n") await asyncio.sleep(2) # Join all chunks and decode to get the complete output as a string full_output = b"".join(output_buffer).decode() print(full_output)

How do I wait for a specific pattern in the output?

import asyncio async def wait_for_prompt(): # Create an asyncio Event that will be set when the pattern is found found = asyncio.Event() def check_output(data: bytes): # Check if the shell prompt character appears in the output if b"$" in data or b">" in data: found.set() # Signal that the pattern was detected stream.on_output(check_output) await stream.attach(session.session_id) await stream.send_input(b"cd /app\n") # Block until the prompt appears or timeout after 10 seconds await asyncio.wait_for(found.wait(), timeout=10) print("Command completed!")

How do I handle different output types?

How do I separate stdout from stderr?

def on_output(data: bytes, stream_type: str): # Route output to the correct system stream based on its type if stream_type == "stdout": sys.stdout.buffer.write(data) # Normal output goes to stdout elif stream_type == "stderr": sys.stderr.buffer.write(data) # Error output goes to stderr stream.on_output(on_output)

How do I handle binary output?

# Binary data (e.g., from cat-ing a file) is passed as raw bytes def on_binary_output(data: bytes): # Append binary data to a file (open in append-binary mode) with open("output.bin", "ab") as f: f.write(data) stream.on_output(on_binary_output) # Stream the contents of a binary file from the remote machine await stream.send_input(b"cat /path/to/binary.file\n")

How do I process logs in real-time?

import re class LogProcessor: def __init__(self): self.error_count = 0 self.warning_count = 0 def process(self, data: bytes): text = data.decode() # Split output into individual lines and classify each one for line in text.split("\n"): if re.search(r"ERROR|FATAL", line): self.error_count += 1 self.alert(line) # Immediately alert on errors elif re.search(r"WARN", line): self.warning_count += 1 def alert(self, message): # Forward critical messages to external services (Slack, PagerDuty, etc.) send_notification(message) processor = LogProcessor() # Register the processor's process method as the output handler stream.on_output(processor.process) await stream.attach(session.session_id) # Start tailing the log file for continuous monitoring await stream.send_input(b"tail -f /var/log/app.log\n")

How do I register multiple output handlers?

# Handler 1: print output to the console in real-time stream.on_output(lambda d: print(d.decode(), end="")) # Handler 2: write raw bytes to a log file for later review log_file = open("session.log", "ab") stream.on_output(lambda d: log_file.write(d)) # Handler 3: check each chunk for error patterns and send alerts stream.on_output(lambda d: alert_if_error(d))

How do I use buffered streaming for high-volume output?

For performance with high-volume output:

import asyncio from collections import deque class BufferedHandler: def __init__(self, flush_interval=0.1): self.buffer = deque() # Thread-safe double-ended queue self.flush_interval = flush_interval # Seconds between flushes def add(self, data: bytes): # Append incoming data to the buffer (called per output chunk) self.buffer.append(data) async def run(self): while True: if self.buffer: # Drain the entire buffer into a batch for processing batch = list(self.buffer) self.buffer.clear() await self.process_batch(batch) # Wait before checking for more data (controls processing rate) await asyncio.sleep(self.flush_interval) async def process_batch(self, batch): # Combine all buffered chunks into one for efficient processing combined = b"".join(batch) print(combined.decode(), end="") handler = BufferedHandler() stream.on_output(handler.add) # Run the buffer processor as a background task alongside the stream asyncio.create_task(handler.run())

How do I forward a stream to a WebSocket?

from fastapi import WebSocket async def stream_to_websocket(websocket: WebSocket, hostname: str): async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: session = await client.terminal.get_active_session(hostname) stream = client.terminal.stream() # Forward each output chunk from the terminal to the WebSocket client async def forward_output(data: bytes): await websocket.send_bytes(data) stream.on_output_async(forward_output) await stream.attach(session.session_id) # Receive user input from the WebSocket and forward to the terminal while True: data = await websocket.receive_bytes() await stream.send_input(data)

How do I handle backpressure in streams?

import asyncio class FlowControlledHandler: def __init__(self, max_queue=1000): # Bounded queue prevents unbounded memory growth under high load self.queue = asyncio.Queue(maxsize=max_queue) def add(self, data: bytes): try: # Non-blocking put: add data if queue has capacity self.queue.put_nowait(data) except asyncio.QueueFull: # Drop oldest data or handle overflow when consumer is too slow pass async def consume(self): while True: # Block until data is available, then process it data = await self.queue.get() await self.process(data) self.queue.task_done() # Signal that this item is done

What are the performance tips for streaming?

Why should I flush output immediately?

def on_output(data): # flush=True forces immediate write to the terminal (no buffering delay) print(data.decode(), end="", flush=True)

Why should I use async handlers for I/O?

# Good: async handler does not block the event loop during network/disk I/O async def on_output(data): await save_to_db(data) # Avoid: sync handler blocks the event loop while waiting for I/O def on_output(data): save_to_db_sync(data) # Blocks event loop, delays all other tasks

When should I use buffering?

# Use a short flush interval for streams producing thousands of lines/second handler = BufferedHandler(flush_interval=0.05) # Flush every 50ms

Next

Last updated on