Skip to Content

Fleet Management

TL;DR

CMDOP fleet management lets you orchestrate AI-driven operations across multiple machines simultaneously. It supports parallel health checks, rolling and canary deployments with automatic rollback, real-time fleet dashboards, parallel command execution, cross-server log aggregation, configuration synchronization, and automated remediation of common issues like high disk or memory usage.

Use AI to manage multiple machines simultaneously.

How do I run AI operations across multiple machines?

from cmdop import AsyncCMDOPClient from pydantic import BaseModel import asyncio # Schema for per-server health report class ServerStatus(BaseModel): hostname: str healthy: bool cpu_percent: float memory_percent: float issues: list[str] async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: servers = ["web-1", "web-2", "web-3", "db-1"] async def check_server(hostname: str) -> ServerStatus: # Switch the terminal session to the target machine await client.terminal.set_machine(hostname) # AI checks health and returns structured status result = await client.agent.run( prompt="Check server health and identify any issues", output_schema=ServerStatus ) return result.output # Run all health checks in parallel using asyncio.gather statuses = await asyncio.gather(*[check_server(s) for s in servers]) # Print a summary for each server for status in statuses: icon = "✓" if status.healthy else "✗" print(f"{icon} {status.hostname}: CPU {status.cpu_percent}%") for issue in status.issues: print(f" ⚠ {issue}")

How do I perform a rolling deployment across servers?

# Schema for individual server deployment outcome class DeployResult(BaseModel): hostname: str success: bool version: str duration_seconds: float error: str | None async def rolling_deploy(servers: list[str], version: str): results = [] # Deploy to each server one at a time (sequentially) for server in servers: print(f"Deploying to {server}...") await client.terminal.set_machine(server) # AI runs the full deploy pipeline on this server result = await client.agent.run( prompt=f""" Deploy version {version}: 1. Pull latest code 2. Install dependencies 3. Run migrations 4. Restart service 5. Health check If health check fails, rollback immediately. """, output_schema=DeployResult ) results.append(result.output) # Stop the entire rollout if any server fails if not result.output.success: print(f"❌ Failed on {server}: {result.output.error}") print("Stopping deployment") break else: print(f"✓ {server} deployed in {result.output.duration_seconds}s") return results results = await rolling_deploy( ["web-1", "web-2", "web-3"], version="2.1.0" )

How do I set up a canary deployment?

async def canary_deploy( canary_server: str, production_servers: list[str], version: str, canary_duration: int = 300 # 5 minutes monitoring window ): # Phase 1: Deploy to the single canary server first print(f"Deploying to canary: {canary_server}") await client.terminal.set_machine(canary_server) canary_result = await client.agent.run( prompt=f"Deploy version {version} and verify", output_schema=DeployResult ) if not canary_result.output.success: return {"success": False, "stage": "canary", "error": canary_result.output.error} # Phase 2: Monitor canary health at 30-second intervals print(f"Monitoring canary for {canary_duration}s...") class CanaryHealth(BaseModel): error_rate: float latency_p99: float healthy: bool for _ in range(canary_duration // 30): # Check every 30s await asyncio.sleep(30) health = await client.agent.run( prompt="Check error rate and latency", output_schema=CanaryHealth ) # Rollback canary if error rate exceeds 1% threshold if not health.output.healthy or health.output.error_rate > 0.01: await client.agent.run("Rollback to previous version") return {"success": False, "stage": "canary_monitoring"} # Phase 3: Canary passed -- deploy to all production servers print("Canary healthy, deploying to production...") for server in production_servers: await client.terminal.set_machine(server) result = await client.agent.run( prompt=f"Deploy version {version}", output_schema=DeployResult ) if not result.output.success: # Rollback everything if any production server fails return {"success": False, "stage": "production", "server": server} return {"success": True}

How do I build a fleet health dashboard?

# Per-machine health metrics class MachineHealth(BaseModel): hostname: str status: str # healthy, warning, critical cpu_percent: float memory_percent: float disk_percent: float uptime_hours: float services_up: int services_down: int # Aggregated fleet-level summary class FleetHealth(BaseModel): total_machines: int healthy: int warning: int critical: int machines: list[MachineHealth] async def fleet_health_check(servers: list[str]) -> FleetHealth: async def check_one(hostname: str) -> MachineHealth: await client.terminal.set_machine(hostname) # AI gathers system metrics and service statuses result = await client.agent.run( "Check system health, services status", output_schema=MachineHealth ) return result.output # Check all machines in parallel for speed machines = await asyncio.gather(*[check_one(s) for s in servers]) # Build the aggregate summary from individual results return FleetHealth( total_machines=len(./machines), healthy=len([m for m in machines if m.status == "healthy"]), warning=len([m for m in machines if m.status == "warning"]), critical=len([m for m in machines if m.status == "critical"]), machines=machines ) # Continuous monitoring loop that refreshes every 60 seconds async def monitor_fleet(): servers = ["web-1", "web-2", "web-3", "db-1", "cache-1"] while True: health = await fleet_health_check(servers) print(f"\n=== Fleet Status ===") print(f"✓ Healthy: {health.healthy}") print(f"⚠ Warning: {health.warning}") print(f"✗ Critical: {health.critical}") # Only print details for machines that need attention for machine in health.machines: if machine.status != "healthy": print(f"\n{machine.hostname} ({machine.status}):") print(f" CPU: {machine.cpu_percent}%") print(f" Memory: {machine.memory_percent}%") print(f" Services down: {machine.services_down}") await asyncio.sleep(60)

How do I execute commands on all servers in parallel?

async def run_on_all(servers: list[str], command: str): """Run same command on all servers.""" # Schema for the output of a single command execution class CommandResult(BaseModel): hostname: str exit_code: int output: str async def run_one(hostname: str) -> CommandResult: await client.terminal.set_machine(hostname) # AI executes the given command and captures the result result = await client.agent.run( f"Run: {command}", output_schema=CommandResult ) return result.output # Fire all commands in parallel using asyncio.gather return await asyncio.gather(*[run_one(s) for s in servers]) # Example: update packages on all web servers at once results = await run_on_all( ["web-1", "web-2", "web-3"], "apt update && apt upgrade -y" )

How do I aggregate logs from multiple servers?

# Single log entry with its originating server class LogEntry(BaseModel): timestamp: str level: str message: str source: str # Aggregated log analysis across the fleet class AggregatedLogs(BaseModel): total_errors: int servers_with_errors: list[str] unique_errors: list[str] entries: list[LogEntry] async def aggregate_logs(servers: list[str], pattern: str = "ERROR"): all_entries = [] for server in servers: await client.terminal.set_machine(server) class ServerLogs(BaseModel): entries: list[LogEntry] # AI searches logs on this server for the given pattern result = await client.agent.run( f"Find log entries matching '{pattern}' from last hour", output_schema=ServerLogs ) # Tag each entry with the server it came from for entry in result.output.entries: entry.source = server all_entries.append(entry) # Deduplicate error messages and identify affected servers unique_messages = set(e.message for e in all_entries) servers_with_errors = set(e.source for e in all_entries) return AggregatedLogs( total_errors=len(all_entries), servers_with_errors=list(servers_with_errors), unique_errors=list(unique_messages)[:10], entries=sorted(all_entries, key=lambda e: e.timestamp, reverse=True)[:100] )

How do I synchronize configuration across servers?

async def sync_config(servers: list[str], config_content: str, config_path: str): """Ensure all servers have same config.""" # Tracks what action was taken on each server (unchanged, updated, created) class SyncResult(BaseModel): hostname: str action: str # unchanged, updated, created previous_hash: str | None current_hash: str results = [] for server in servers: await client.terminal.set_machine(server) # AI compares the current file hash and writes the new content if different result = await client.agent.run( f""" Sync config file at {config_path}: 1. Check if file exists and get its hash 2. If different or missing, write new content 3. Report what action was taken New content: {config_content} """, output_schema=SyncResult ) results.append(result.output) return results

How do I set up automated remediation for common issues?

async def auto_remediate(): """Automatically fix common issues.""" # Describes a single detected issue and whether it can be auto-fixed class Issue(BaseModel): type: str severity: str description: str auto_fixable: bool # Per-server remediation outcome class RemediationResult(BaseModel): hostname: str issues_found: list[Issue] issues_fixed: list[str] manual_intervention_needed: list[str] servers = ["web-1", "web-2", "web-3"] results = [] for server in servers: await client.terminal.set_machine(server) # AI checks for known issue types and auto-fixes what it can result = await client.agent.run( """ Check for common issues and auto-fix if possible: 1. High disk usage (>90%) → clean old logs 2. High memory usage → restart leaky service 3. Failed services → restart them 4. Expired certificates → alert (don't fix) Report what was found and fixed. """, output_schema=RemediationResult ) results.append(result.output) # Escalate issues that require human intervention for issue in result.output.manual_intervention_needed: alert_team(server, issue) return results

What are the best practices for fleet management?

1. Use Parallel Execution for Independent Operations

# Good: parallel health checks -- all servers checked simultaneously await asyncio.gather(*[check(s) for s in servers]) # Bad: sequential when not needed -- each waits for the previous for s in servers: await check(s)

2. Implement Circuit Breakers

# Stop the rollout after too many consecutive failures failures = 0 max_failures = 3 for server in servers: if failures >= max_failures: print("Too many failures, stopping") break result = await deploy(server) if not result.success: failures += 1

3. Log Everything

import logging # Use a dedicated logger for fleet operations for easy filtering logger = logging.getLogger("fleet") async def deploy_with_logging(server: str): logger.info(f"Starting deploy on {server}") result = await deploy(server) logger.info(f"Deploy on {server}: {result.success}") return result

4. Use Dry Run First

# Step 1: Plan in dry-run mode (no side effects) plan = await client.agent.run( "Plan deployment", restrictions={"dry_run": True} ) # Step 2: Review the planned actions print(plan.output.planned_actions) # Step 3: Execute with dry_run disabled after review result = await client.agent.run( "Execute deployment", restrictions={"dry_run": False} )

Next

Last updated on