Task Execution
TL;DR
CMDOP task execution lets you describe goals in natural language and have AI autonomously run shell commands on remote machines. It supports structured Pydantic output schemas, multi-step workflows with rollback, streaming real-time events, dry-run safety restrictions, and approval workflows for critical operations. Ideal for deployments, incident response, and log analysis.
Give AI a goal, let it figure out and execute the necessary commands.
How do I run a basic AI task?
from cmdop import AsyncCMDOPClient
from pydantic import BaseModel
# Define the expected shape of the AI's response
class TaskResult(BaseModel):
success: bool
actions_taken: list[str]
output: str
error: str | None
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
# Connect to the target machine
await client.terminal.set_machine("prod-server")
# Run a natural-language prompt; AI decides which shell commands to execute
result = await client.agent.run(
prompt="Restart the nginx service and verify it's running",
output_schema=TaskResult
)
# Access typed fields directly from the structured result
if result.output.success:
print("Nginx restarted successfully")
print(f"Actions: {result.output.actions_taken}")
else:
print(f"Failed: {result.output.error}")How do I handle complex multi-step tasks?
Deployment
# Schema capturing every detail of a deployment outcome
class DeployResult(BaseModel):
success: bool
version: str
steps_completed: list[str]
duration_seconds: float
tests_passed: int
tests_failed: int
rollback_performed: bool
error: str | None
# Multi-step prompt: AI executes each instruction sequentially
result = await client.agent.run(
prompt="""
Deploy version 2.1.0:
1. Pull latest code from git
2. Install dependencies
3. Run database migrations
4. Run tests
5. If tests pass, restart application
6. If tests fail, rollback and notify
""",
output_schema=DeployResult
)
# Branch on rollback flag to decide notification content
if result.output.rollback_performed:
notify_team(f"Deploy failed, rolled back: {result.output.error}")
else:
notify_team(f"Deployed v{result.output.version} in {result.output.duration_seconds}s")Incident Response
# Schema for incident investigation with escalation support
class IncidentAnalysis(BaseModel):
root_cause: str
affected_services: list[str]
impact: str
mitigation_steps: list[str]
steps_taken: list[str]
resolved: bool
requires_human: bool
escalation_reason: str | None
# AI investigates the alert and decides whether to auto-fix or escalate
result = await client.agent.run(
prompt="""
Investigate high CPU alert:
1. Identify process causing high CPU
2. Check if it's a runaway process or legitimate load
3. If runaway, safely terminate it
4. Check logs for related errors
5. Determine root cause
""",
output_schema=IncidentAnalysis
)
# Route outcome: escalate to on-call or log the auto-resolution
if result.output.requires_human:
escalate_to_oncall(result.output)
else:
log_resolution(result.output)Log Analysis
# Represents a single recurring error pattern found in logs
class ErrorPattern(BaseModel):
pattern: str
count: int
first_occurrence: str
last_occurrence: str
example: str
# Aggregated analysis across all error patterns
class LogAnalysis(BaseModel):
total_errors: int
error_patterns: list[ErrorPattern]
potential_causes: list[str]
recommended_fixes: list[str]
# AI reads the log file, groups errors by pattern, and suggests fixes
result = await client.agent.run(
prompt="Analyze /var/log/app.log for the last hour, identify error patterns and suggest fixes",
output_schema=LogAnalysis
)
# Auto-create tickets for high-frequency error patterns
for pattern in result.output.error_patterns:
if pattern.count > 100:
create_ticket(f"Frequent error: {pattern.pattern}", pattern.example)How do I stream AI execution in real time?
Watch AI work in real-time:
# run_stream yields events as AI thinks, calls tools, and produces results
async for event in client.agent.run_stream(
prompt="Deploy and monitor",
output_schema=DeployResult
):
if event.type == "thinking":
print(f"AI: {event.content}") # AI's reasoning step
elif event.type == "tool_call":
print(f"Running: {event.tool} with {event.args}") # Shell command being invoked
elif event.type == "tool_result":
print(f"Output: {event.result[:100]}...") # Truncated command output
elif event.type == "result":
deploy_result = event.output # Final structured resultHow do I provide context to guide AI decisions?
Provide additional context to guide AI:
# Pass a context dict so AI understands the environment before acting
result = await client.agent.run(
prompt="Fix the out of memory issue",
output_schema=FixResult,
context={
"service": "api-server",
"memory_limit": "4GB",
"current_usage": "3.8GB",
"recent_deployments": ["v2.0.5 (2 hours ago)"],
"environment": "production"
}
)How do I restrict what AI can do?
Limit what AI can do:
# restrictions dict constrains which commands/actions AI is allowed to run
result = await client.agent.run(
prompt="Clean up old files in /tmp",
output_schema=CleanupResult,
restrictions={
"no_delete": False, # Allow deletion
"no_sudo": True, # No root commands
"dry_run": True, # Show what would happen, don't do it
"allowed_dirs": ["/tmp", "/var/cache"],
"forbidden_commands": ["rm -rf /", "shutdown"]
}
)
# Review the dry-run plan, then re-run with dry_run=False to execute
if approve(result.output.planned_actions):
result = await client.agent.run(
prompt="Execute the cleanup",
restrictions={"dry_run": False}
)How do I break tasks into sequential steps?
Break complex tasks into steps:
# Individual step outcome within a larger migration
class StepResult(BaseModel):
step_name: str
success: bool
output: str
continue_to_next: bool
# Overall migration result aggregating all steps
class MigrationResult(BaseModel):
steps: list[StepResult]
overall_success: bool
database_records_migrated: int
rollback_needed: bool
# AI executes steps in order and stops on failure
result = await client.agent.run(
prompt="""
Migrate database:
Step 1: Backup current database
Step 2: Run schema migrations
Step 3: Run data migrations
Step 4: Verify data integrity
Step 5: Update application config
Stop and report if any step fails.
""",
output_schema=MigrationResult
)
# Print a summary of each step's pass/fail status
for step in result.output.steps:
print(f"{step.step_name}: {'✓' if step.success else '✗'}")How does conditional logic work in AI tasks?
# Health check schema with optional action tracking
class HealthCheck(BaseModel):
service: str
status: str
action_taken: str | None
restarted: bool
# AI follows branching logic based on real-time service state
result = await client.agent.run(
prompt="""
Check if the service is responding:
- If healthy, report status
- If unhealthy for < 5 minutes, wait and retry
- If unhealthy for > 5 minutes, restart service
- After restart, verify it's healthy
""",
output_schema=HealthCheck
)Error Handling
# Schema that tracks retry attempts and final outcome
class TaskWithRetry(BaseModel):
success: bool
attempts: int
final_error: str | None
result: str | None
# max_retries tells the agent to automatically retry on transient failures
result = await client.agent.run(
prompt="""
Deploy the application:
- If deployment fails, retry up to 3 times
- If still failing, collect diagnostics
- Report final status
""",
output_schema=TaskWithRetry,
max_retries=3
)How do I set up an approval workflow for critical operations?
For critical operations:
# Schema for the dry-run plan AI generates before execution
class ActionPlan(BaseModel):
planned_commands: list[str]
affected_services: list[str]
estimated_downtime: str
risk_level: str
# Step 1: Get the plan in dry-run mode (no changes made)
plan = await client.agent.run(
prompt="Plan a zero-downtime deployment of v3.0",
output_schema=ActionPlan,
restrictions={"dry_run": True}
)
# Step 2: Show the plan to the operator for review
print("Planned actions:")
for cmd in plan.output.planned_commands:
print(f" - {cmd}")
print(f"Risk: {plan.output.risk_level}")
# Step 3: Execute only after explicit human approval
if get_user_approval():
result = await client.agent.run(
prompt=f"Execute the deployment plan: {plan.output.planned_commands}",
output_schema=DeployResult
)What are the best practices for AI task execution?
1. Be Specific About Success Criteria
# Clearly define what counts as success vs failure in the prompt
prompt = """
Deploy v2.0:
- SUCCESS: All tests pass, service responds to health check
- FAILURE: Any test fails OR service doesn't respond within 60 seconds
"""2. Include Rollback Instructions
# Always include rollback steps so AI can recover from failures
prompt = """
Deploy v2.0:
- If deployment fails, rollback to previous version
- Verify rollback was successful
- Report both outcomes
"""3. Use Dry Run for Dangerous Operations
# Preview changes before committing them
result = await client.agent.run(
prompt="Clean up old deployments",
restrictions={"dry_run": True}
)
# Review, then execute4. Set Timeouts
# Prevent runaway tasks by setting a maximum execution time
result = await client.agent.run(
prompt="Run performance tests",
output_schema=TestResult,
timeout=300 # 5 minutes max
)Next
- Fleet Management — Multi-machine orchestration
- Structured Output — Schema design
Last updated on