Skip to Content

Task Execution

TL;DR

CMDOP task execution lets you describe goals in natural language and have AI autonomously run shell commands on remote machines. It supports structured Pydantic output schemas, multi-step workflows with rollback, streaming real-time events, dry-run safety restrictions, and approval workflows for critical operations. Ideal for deployments, incident response, and log analysis.

Give AI a goal, let it figure out and execute the necessary commands.

How do I run a basic AI task?

from cmdop import AsyncCMDOPClient from pydantic import BaseModel # Define the expected shape of the AI's response class TaskResult(BaseModel): success: bool actions_taken: list[str] output: str error: str | None async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: # Connect to the target machine await client.terminal.set_machine("prod-server") # Run a natural-language prompt; AI decides which shell commands to execute result = await client.agent.run( prompt="Restart the nginx service and verify it's running", output_schema=TaskResult ) # Access typed fields directly from the structured result if result.output.success: print("Nginx restarted successfully") print(f"Actions: {result.output.actions_taken}") else: print(f"Failed: {result.output.error}")

How do I handle complex multi-step tasks?

Deployment

# Schema capturing every detail of a deployment outcome class DeployResult(BaseModel): success: bool version: str steps_completed: list[str] duration_seconds: float tests_passed: int tests_failed: int rollback_performed: bool error: str | None # Multi-step prompt: AI executes each instruction sequentially result = await client.agent.run( prompt=""" Deploy version 2.1.0: 1. Pull latest code from git 2. Install dependencies 3. Run database migrations 4. Run tests 5. If tests pass, restart application 6. If tests fail, rollback and notify """, output_schema=DeployResult ) # Branch on rollback flag to decide notification content if result.output.rollback_performed: notify_team(f"Deploy failed, rolled back: {result.output.error}") else: notify_team(f"Deployed v{result.output.version} in {result.output.duration_seconds}s")

Incident Response

# Schema for incident investigation with escalation support class IncidentAnalysis(BaseModel): root_cause: str affected_services: list[str] impact: str mitigation_steps: list[str] steps_taken: list[str] resolved: bool requires_human: bool escalation_reason: str | None # AI investigates the alert and decides whether to auto-fix or escalate result = await client.agent.run( prompt=""" Investigate high CPU alert: 1. Identify process causing high CPU 2. Check if it's a runaway process or legitimate load 3. If runaway, safely terminate it 4. Check logs for related errors 5. Determine root cause """, output_schema=IncidentAnalysis ) # Route outcome: escalate to on-call or log the auto-resolution if result.output.requires_human: escalate_to_oncall(result.output) else: log_resolution(result.output)

Log Analysis

# Represents a single recurring error pattern found in logs class ErrorPattern(BaseModel): pattern: str count: int first_occurrence: str last_occurrence: str example: str # Aggregated analysis across all error patterns class LogAnalysis(BaseModel): total_errors: int error_patterns: list[ErrorPattern] potential_causes: list[str] recommended_fixes: list[str] # AI reads the log file, groups errors by pattern, and suggests fixes result = await client.agent.run( prompt="Analyze /var/log/app.log for the last hour, identify error patterns and suggest fixes", output_schema=LogAnalysis ) # Auto-create tickets for high-frequency error patterns for pattern in result.output.error_patterns: if pattern.count > 100: create_ticket(f"Frequent error: {pattern.pattern}", pattern.example)

How do I stream AI execution in real time?

Watch AI work in real-time:

# run_stream yields events as AI thinks, calls tools, and produces results async for event in client.agent.run_stream( prompt="Deploy and monitor", output_schema=DeployResult ): if event.type == "thinking": print(f"AI: {event.content}") # AI's reasoning step elif event.type == "tool_call": print(f"Running: {event.tool} with {event.args}") # Shell command being invoked elif event.type == "tool_result": print(f"Output: {event.result[:100]}...") # Truncated command output elif event.type == "result": deploy_result = event.output # Final structured result

How do I provide context to guide AI decisions?

Provide additional context to guide AI:

# Pass a context dict so AI understands the environment before acting result = await client.agent.run( prompt="Fix the out of memory issue", output_schema=FixResult, context={ "service": "api-server", "memory_limit": "4GB", "current_usage": "3.8GB", "recent_deployments": ["v2.0.5 (2 hours ago)"], "environment": "production" } )

How do I restrict what AI can do?

Limit what AI can do:

# restrictions dict constrains which commands/actions AI is allowed to run result = await client.agent.run( prompt="Clean up old files in /tmp", output_schema=CleanupResult, restrictions={ "no_delete": False, # Allow deletion "no_sudo": True, # No root commands "dry_run": True, # Show what would happen, don't do it "allowed_dirs": ["/tmp", "/var/cache"], "forbidden_commands": ["rm -rf /", "shutdown"] } ) # Review the dry-run plan, then re-run with dry_run=False to execute if approve(result.output.planned_actions): result = await client.agent.run( prompt="Execute the cleanup", restrictions={"dry_run": False} )

How do I break tasks into sequential steps?

Break complex tasks into steps:

# Individual step outcome within a larger migration class StepResult(BaseModel): step_name: str success: bool output: str continue_to_next: bool # Overall migration result aggregating all steps class MigrationResult(BaseModel): steps: list[StepResult] overall_success: bool database_records_migrated: int rollback_needed: bool # AI executes steps in order and stops on failure result = await client.agent.run( prompt=""" Migrate database: Step 1: Backup current database Step 2: Run schema migrations Step 3: Run data migrations Step 4: Verify data integrity Step 5: Update application config Stop and report if any step fails. """, output_schema=MigrationResult ) # Print a summary of each step's pass/fail status for step in result.output.steps: print(f"{step.step_name}: {'✓' if step.success else '✗'}")

How does conditional logic work in AI tasks?

# Health check schema with optional action tracking class HealthCheck(BaseModel): service: str status: str action_taken: str | None restarted: bool # AI follows branching logic based on real-time service state result = await client.agent.run( prompt=""" Check if the service is responding: - If healthy, report status - If unhealthy for < 5 minutes, wait and retry - If unhealthy for > 5 minutes, restart service - After restart, verify it's healthy """, output_schema=HealthCheck )

Error Handling

# Schema that tracks retry attempts and final outcome class TaskWithRetry(BaseModel): success: bool attempts: int final_error: str | None result: str | None # max_retries tells the agent to automatically retry on transient failures result = await client.agent.run( prompt=""" Deploy the application: - If deployment fails, retry up to 3 times - If still failing, collect diagnostics - Report final status """, output_schema=TaskWithRetry, max_retries=3 )

How do I set up an approval workflow for critical operations?

For critical operations:

# Schema for the dry-run plan AI generates before execution class ActionPlan(BaseModel): planned_commands: list[str] affected_services: list[str] estimated_downtime: str risk_level: str # Step 1: Get the plan in dry-run mode (no changes made) plan = await client.agent.run( prompt="Plan a zero-downtime deployment of v3.0", output_schema=ActionPlan, restrictions={"dry_run": True} ) # Step 2: Show the plan to the operator for review print("Planned actions:") for cmd in plan.output.planned_commands: print(f" - {cmd}") print(f"Risk: {plan.output.risk_level}") # Step 3: Execute only after explicit human approval if get_user_approval(): result = await client.agent.run( prompt=f"Execute the deployment plan: {plan.output.planned_commands}", output_schema=DeployResult )

What are the best practices for AI task execution?

1. Be Specific About Success Criteria

# Clearly define what counts as success vs failure in the prompt prompt = """ Deploy v2.0: - SUCCESS: All tests pass, service responds to health check - FAILURE: Any test fails OR service doesn't respond within 60 seconds """

2. Include Rollback Instructions

# Always include rollback steps so AI can recover from failures prompt = """ Deploy v2.0: - If deployment fails, rollback to previous version - Verify rollback was successful - Report both outcomes """

3. Use Dry Run for Dangerous Operations

# Preview changes before committing them result = await client.agent.run( prompt="Clean up old deployments", restrictions={"dry_run": True} ) # Review, then execute

4. Set Timeouts

# Prevent runaway tasks by setting a maximum execution time result = await client.agent.run( prompt="Run performance tests", output_schema=TestResult, timeout=300 # 5 minutes max )

Next

Last updated on