Real-Time Streaming
TL;DR
Stream command output in real-time using on_output callbacks instead of waiting for completion. Supports sync and async handlers, pattern detection for alerts, stdout/stderr separation, and binary output. Use buffered streaming for high-volume output, and FlowControlledHandler for backpressure. Forward streams to WebSockets, databases, or multiple handlers simultaneously.
Process command output as it arrives, not after completion.
Why use streaming instead of waiting for output?
Without streaming:
$ ./build.sh
[wait 5 minutes...]
[get all output at once]
With streaming:
$ ./build.sh
[Step 1/5] Installing...
[Step 2/5] Compiling... <- see immediately
[Step 3/5] Testing...
...How do I set up basic output streaming?
from cmdop import AsyncCMDOPClient
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
session = await client.terminal.get_active_session("my-server")
stream = client.terminal.stream()
# Define a callback that fires for each chunk of output received
def on_output(data: bytes):
# Decode bytes to string; flush=True ensures immediate display
print(data.decode(), end="", flush=True)
# Register the callback to receive real-time output
stream.on_output(on_output)
# Attach to the session and start a long-running build
await stream.attach(session.session_id)
await stream.send_input(b"./long_build.sh\n")
# Keep the connection open to continue receiving output (5 minutes)
await asyncio.sleep(300)How do I use async output handlers?
async def handle_output(data: bytes):
text = data.decode()
# Perform async I/O operations like database writes and WebSocket sends
await save_to_database(text)
await send_to_websocket(text)
# Detect error patterns in real-time and trigger alerts
if "error" in text.lower():
await send_alert(text)
# Use on_output_async for handlers that need to await async operations
stream.on_output_async(handle_output)How do I collect all output into a buffer?
# Initialize a list to accumulate output chunks as they arrive
output_buffer = []
def collect(data: bytes):
# Append each raw byte chunk to the buffer
output_buffer.append(data)
stream.on_output(collect)
await stream.attach(session.session_id)
await stream.send_input(b"ls -la\n")
await asyncio.sleep(2)
# Join all chunks and decode to get the complete output as a string
full_output = b"".join(output_buffer).decode()
print(full_output)How do I wait for a specific pattern in the output?
import asyncio
async def wait_for_prompt():
# Create an asyncio Event that will be set when the pattern is found
found = asyncio.Event()
def check_output(data: bytes):
# Check if the shell prompt character appears in the output
if b"$" in data or b">" in data:
found.set() # Signal that the pattern was detected
stream.on_output(check_output)
await stream.attach(session.session_id)
await stream.send_input(b"cd /app\n")
# Block until the prompt appears or timeout after 10 seconds
await asyncio.wait_for(found.wait(), timeout=10)
print("Command completed!")How do I handle different output types?
How do I separate stdout from stderr?
def on_output(data: bytes, stream_type: str):
# Route output to the correct system stream based on its type
if stream_type == "stdout":
sys.stdout.buffer.write(data) # Normal output goes to stdout
elif stream_type == "stderr":
sys.stderr.buffer.write(data) # Error output goes to stderr
stream.on_output(on_output)How do I handle binary output?
# Binary data (e.g., from cat-ing a file) is passed as raw bytes
def on_binary_output(data: bytes):
# Append binary data to a file (open in append-binary mode)
with open("output.bin", "ab") as f:
f.write(data)
stream.on_output(on_binary_output)
# Stream the contents of a binary file from the remote machine
await stream.send_input(b"cat /path/to/binary.file\n")How do I process logs in real-time?
import re
class LogProcessor:
def __init__(self):
self.error_count = 0
self.warning_count = 0
def process(self, data: bytes):
text = data.decode()
# Split output into individual lines and classify each one
for line in text.split("\n"):
if re.search(r"ERROR|FATAL", line):
self.error_count += 1
self.alert(line) # Immediately alert on errors
elif re.search(r"WARN", line):
self.warning_count += 1
def alert(self, message):
# Forward critical messages to external services (Slack, PagerDuty, etc.)
send_notification(message)
processor = LogProcessor()
# Register the processor's process method as the output handler
stream.on_output(processor.process)
await stream.attach(session.session_id)
# Start tailing the log file for continuous monitoring
await stream.send_input(b"tail -f /var/log/app.log\n")How do I register multiple output handlers?
# Handler 1: print output to the console in real-time
stream.on_output(lambda d: print(d.decode(), end=""))
# Handler 2: write raw bytes to a log file for later review
log_file = open("session.log", "ab")
stream.on_output(lambda d: log_file.write(d))
# Handler 3: check each chunk for error patterns and send alerts
stream.on_output(lambda d: alert_if_error(d))How do I use buffered streaming for high-volume output?
For performance with high-volume output:
import asyncio
from collections import deque
class BufferedHandler:
def __init__(self, flush_interval=0.1):
self.buffer = deque() # Thread-safe double-ended queue
self.flush_interval = flush_interval # Seconds between flushes
def add(self, data: bytes):
# Append incoming data to the buffer (called per output chunk)
self.buffer.append(data)
async def run(self):
while True:
if self.buffer:
# Drain the entire buffer into a batch for processing
batch = list(self.buffer)
self.buffer.clear()
await self.process_batch(batch)
# Wait before checking for more data (controls processing rate)
await asyncio.sleep(self.flush_interval)
async def process_batch(self, batch):
# Combine all buffered chunks into one for efficient processing
combined = b"".join(batch)
print(combined.decode(), end="")
handler = BufferedHandler()
stream.on_output(handler.add)
# Run the buffer processor as a background task alongside the stream
asyncio.create_task(handler.run())How do I forward a stream to a WebSocket?
from fastapi import WebSocket
async def stream_to_websocket(websocket: WebSocket, hostname: str):
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
session = await client.terminal.get_active_session(hostname)
stream = client.terminal.stream()
# Forward each output chunk from the terminal to the WebSocket client
async def forward_output(data: bytes):
await websocket.send_bytes(data)
stream.on_output_async(forward_output)
await stream.attach(session.session_id)
# Receive user input from the WebSocket and forward to the terminal
while True:
data = await websocket.receive_bytes()
await stream.send_input(data)How do I handle backpressure in streams?
import asyncio
class FlowControlledHandler:
def __init__(self, max_queue=1000):
# Bounded queue prevents unbounded memory growth under high load
self.queue = asyncio.Queue(maxsize=max_queue)
def add(self, data: bytes):
try:
# Non-blocking put: add data if queue has capacity
self.queue.put_nowait(data)
except asyncio.QueueFull:
# Drop oldest data or handle overflow when consumer is too slow
pass
async def consume(self):
while True:
# Block until data is available, then process it
data = await self.queue.get()
await self.process(data)
self.queue.task_done() # Signal that this item is doneWhat are the performance tips for streaming?
Why should I flush output immediately?
def on_output(data):
# flush=True forces immediate write to the terminal (no buffering delay)
print(data.decode(), end="", flush=True)Why should I use async handlers for I/O?
# Good: async handler does not block the event loop during network/disk I/O
async def on_output(data):
await save_to_db(data)
# Avoid: sync handler blocks the event loop while waiting for I/O
def on_output(data):
save_to_db_sync(data) # Blocks event loop, delays all other tasksWhen should I use buffering?
# Use a short flush interval for streams producing thousands of lines/second
handler = BufferedHandler(flush_interval=0.05) # Flush every 50msNext
Last updated on