Fleet Management
TL;DR
CMDOP fleet management lets you orchestrate AI-driven operations across multiple machines simultaneously. It supports parallel health checks, rolling and canary deployments with automatic rollback, real-time fleet dashboards, parallel command execution, cross-server log aggregation, configuration synchronization, and automated remediation of common issues like high disk or memory usage.
Use AI to manage multiple machines simultaneously.
How do I run AI operations across multiple machines?
from cmdop import AsyncCMDOPClient
from pydantic import BaseModel
import asyncio
# Schema for per-server health report
class ServerStatus(BaseModel):
hostname: str
healthy: bool
cpu_percent: float
memory_percent: float
issues: list[str]
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
servers = ["web-1", "web-2", "web-3", "db-1"]
async def check_server(hostname: str) -> ServerStatus:
# Switch the terminal session to the target machine
await client.terminal.set_machine(hostname)
# AI checks health and returns structured status
result = await client.agent.run(
prompt="Check server health and identify any issues",
output_schema=ServerStatus
)
return result.output
# Run all health checks in parallel using asyncio.gather
statuses = await asyncio.gather(*[check_server(s) for s in servers])
# Print a summary for each server
for status in statuses:
icon = "✓" if status.healthy else "✗"
print(f"{icon} {status.hostname}: CPU {status.cpu_percent}%")
for issue in status.issues:
print(f" ⚠ {issue}")How do I perform a rolling deployment across servers?
# Schema for individual server deployment outcome
class DeployResult(BaseModel):
hostname: str
success: bool
version: str
duration_seconds: float
error: str | None
async def rolling_deploy(servers: list[str], version: str):
results = []
# Deploy to each server one at a time (sequentially)
for server in servers:
print(f"Deploying to {server}...")
await client.terminal.set_machine(server)
# AI runs the full deploy pipeline on this server
result = await client.agent.run(
prompt=f"""
Deploy version {version}:
1. Pull latest code
2. Install dependencies
3. Run migrations
4. Restart service
5. Health check
If health check fails, rollback immediately.
""",
output_schema=DeployResult
)
results.append(result.output)
# Stop the entire rollout if any server fails
if not result.output.success:
print(f"❌ Failed on {server}: {result.output.error}")
print("Stopping deployment")
break
else:
print(f"✓ {server} deployed in {result.output.duration_seconds}s")
return results
results = await rolling_deploy(
["web-1", "web-2", "web-3"],
version="2.1.0"
)How do I set up a canary deployment?
async def canary_deploy(
canary_server: str,
production_servers: list[str],
version: str,
canary_duration: int = 300 # 5 minutes monitoring window
):
# Phase 1: Deploy to the single canary server first
print(f"Deploying to canary: {canary_server}")
await client.terminal.set_machine(canary_server)
canary_result = await client.agent.run(
prompt=f"Deploy version {version} and verify",
output_schema=DeployResult
)
if not canary_result.output.success:
return {"success": False, "stage": "canary", "error": canary_result.output.error}
# Phase 2: Monitor canary health at 30-second intervals
print(f"Monitoring canary for {canary_duration}s...")
class CanaryHealth(BaseModel):
error_rate: float
latency_p99: float
healthy: bool
for _ in range(canary_duration // 30): # Check every 30s
await asyncio.sleep(30)
health = await client.agent.run(
prompt="Check error rate and latency",
output_schema=CanaryHealth
)
# Rollback canary if error rate exceeds 1% threshold
if not health.output.healthy or health.output.error_rate > 0.01:
await client.agent.run("Rollback to previous version")
return {"success": False, "stage": "canary_monitoring"}
# Phase 3: Canary passed -- deploy to all production servers
print("Canary healthy, deploying to production...")
for server in production_servers:
await client.terminal.set_machine(server)
result = await client.agent.run(
prompt=f"Deploy version {version}",
output_schema=DeployResult
)
if not result.output.success:
# Rollback everything if any production server fails
return {"success": False, "stage": "production", "server": server}
return {"success": True}How do I build a fleet health dashboard?
# Per-machine health metrics
class MachineHealth(BaseModel):
hostname: str
status: str # healthy, warning, critical
cpu_percent: float
memory_percent: float
disk_percent: float
uptime_hours: float
services_up: int
services_down: int
# Aggregated fleet-level summary
class FleetHealth(BaseModel):
total_machines: int
healthy: int
warning: int
critical: int
machines: list[MachineHealth]
async def fleet_health_check(servers: list[str]) -> FleetHealth:
async def check_one(hostname: str) -> MachineHealth:
await client.terminal.set_machine(hostname)
# AI gathers system metrics and service statuses
result = await client.agent.run(
"Check system health, services status",
output_schema=MachineHealth
)
return result.output
# Check all machines in parallel for speed
machines = await asyncio.gather(*[check_one(s) for s in servers])
# Build the aggregate summary from individual results
return FleetHealth(
total_machines=len(./machines),
healthy=len([m for m in machines if m.status == "healthy"]),
warning=len([m for m in machines if m.status == "warning"]),
critical=len([m for m in machines if m.status == "critical"]),
machines=machines
)
# Continuous monitoring loop that refreshes every 60 seconds
async def monitor_fleet():
servers = ["web-1", "web-2", "web-3", "db-1", "cache-1"]
while True:
health = await fleet_health_check(servers)
print(f"\n=== Fleet Status ===")
print(f"✓ Healthy: {health.healthy}")
print(f"⚠ Warning: {health.warning}")
print(f"✗ Critical: {health.critical}")
# Only print details for machines that need attention
for machine in health.machines:
if machine.status != "healthy":
print(f"\n{machine.hostname} ({machine.status}):")
print(f" CPU: {machine.cpu_percent}%")
print(f" Memory: {machine.memory_percent}%")
print(f" Services down: {machine.services_down}")
await asyncio.sleep(60)How do I execute commands on all servers in parallel?
async def run_on_all(servers: list[str], command: str):
"""Run same command on all servers."""
# Schema for the output of a single command execution
class CommandResult(BaseModel):
hostname: str
exit_code: int
output: str
async def run_one(hostname: str) -> CommandResult:
await client.terminal.set_machine(hostname)
# AI executes the given command and captures the result
result = await client.agent.run(
f"Run: {command}",
output_schema=CommandResult
)
return result.output
# Fire all commands in parallel using asyncio.gather
return await asyncio.gather(*[run_one(s) for s in servers])
# Example: update packages on all web servers at once
results = await run_on_all(
["web-1", "web-2", "web-3"],
"apt update && apt upgrade -y"
)How do I aggregate logs from multiple servers?
# Single log entry with its originating server
class LogEntry(BaseModel):
timestamp: str
level: str
message: str
source: str
# Aggregated log analysis across the fleet
class AggregatedLogs(BaseModel):
total_errors: int
servers_with_errors: list[str]
unique_errors: list[str]
entries: list[LogEntry]
async def aggregate_logs(servers: list[str], pattern: str = "ERROR"):
all_entries = []
for server in servers:
await client.terminal.set_machine(server)
class ServerLogs(BaseModel):
entries: list[LogEntry]
# AI searches logs on this server for the given pattern
result = await client.agent.run(
f"Find log entries matching '{pattern}' from last hour",
output_schema=ServerLogs
)
# Tag each entry with the server it came from
for entry in result.output.entries:
entry.source = server
all_entries.append(entry)
# Deduplicate error messages and identify affected servers
unique_messages = set(e.message for e in all_entries)
servers_with_errors = set(e.source for e in all_entries)
return AggregatedLogs(
total_errors=len(all_entries),
servers_with_errors=list(servers_with_errors),
unique_errors=list(unique_messages)[:10],
entries=sorted(all_entries, key=lambda e: e.timestamp, reverse=True)[:100]
)How do I synchronize configuration across servers?
async def sync_config(servers: list[str], config_content: str, config_path: str):
"""Ensure all servers have same config."""
# Tracks what action was taken on each server (unchanged, updated, created)
class SyncResult(BaseModel):
hostname: str
action: str # unchanged, updated, created
previous_hash: str | None
current_hash: str
results = []
for server in servers:
await client.terminal.set_machine(server)
# AI compares the current file hash and writes the new content if different
result = await client.agent.run(
f"""
Sync config file at {config_path}:
1. Check if file exists and get its hash
2. If different or missing, write new content
3. Report what action was taken
New content:
{config_content}
""",
output_schema=SyncResult
)
results.append(result.output)
return resultsHow do I set up automated remediation for common issues?
async def auto_remediate():
"""Automatically fix common issues."""
# Describes a single detected issue and whether it can be auto-fixed
class Issue(BaseModel):
type: str
severity: str
description: str
auto_fixable: bool
# Per-server remediation outcome
class RemediationResult(BaseModel):
hostname: str
issues_found: list[Issue]
issues_fixed: list[str]
manual_intervention_needed: list[str]
servers = ["web-1", "web-2", "web-3"]
results = []
for server in servers:
await client.terminal.set_machine(server)
# AI checks for known issue types and auto-fixes what it can
result = await client.agent.run(
"""
Check for common issues and auto-fix if possible:
1. High disk usage (>90%) → clean old logs
2. High memory usage → restart leaky service
3. Failed services → restart them
4. Expired certificates → alert (don't fix)
Report what was found and fixed.
""",
output_schema=RemediationResult
)
results.append(result.output)
# Escalate issues that require human intervention
for issue in result.output.manual_intervention_needed:
alert_team(server, issue)
return resultsWhat are the best practices for fleet management?
1. Use Parallel Execution for Independent Operations
# Good: parallel health checks -- all servers checked simultaneously
await asyncio.gather(*[check(s) for s in servers])
# Bad: sequential when not needed -- each waits for the previous
for s in servers:
await check(s)2. Implement Circuit Breakers
# Stop the rollout after too many consecutive failures
failures = 0
max_failures = 3
for server in servers:
if failures >= max_failures:
print("Too many failures, stopping")
break
result = await deploy(server)
if not result.success:
failures += 13. Log Everything
import logging
# Use a dedicated logger for fleet operations for easy filtering
logger = logging.getLogger("fleet")
async def deploy_with_logging(server: str):
logger.info(f"Starting deploy on {server}")
result = await deploy(server)
logger.info(f"Deploy on {server}: {result.success}")
return result4. Use Dry Run First
# Step 1: Plan in dry-run mode (no side effects)
plan = await client.agent.run(
"Plan deployment",
restrictions={"dry_run": True}
)
# Step 2: Review the planned actions
print(plan.output.planned_actions)
# Step 3: Execute with dry_run disabled after review
result = await client.agent.run(
"Execute deployment",
restrictions={"dry_run": False}
)Next
- Sessions — Session architecture
- Multi-Client — Collaboration patterns
Last updated on