Connection
TL;DR
Connect to Cmdop via AsyncCMDOPClient.remote(api_key=...) for cloud or
AsyncCMDOPClient.local() for development. Always use async with for automatic cleanup.
Set target machines with client.terminal.set_machine("hostname"). The SDK handles
reconnection automatically with configurable exponential backoff and supports connection
events, debug mode, and multiple concurrent workspace connections.
The SDK supports multiple connection modes for different use cases.
How do I connect to the cloud?
Connect to CMDOPβs cloud Control Plane:
from cmdop import AsyncCMDOPClient
# Connect to the cloud Control Plane using your API key
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
# Execute a command on the connected machine; returns output string and exit code
output, code = await client.terminal.execute("uptime")What configuration options are available?
# Configure the remote client with custom timeout and retry settings
client = AsyncCMDOPClient.remote(
api_key="cmd_xxx",
timeout=30, # Default request timeout in seconds
max_retries=3, # Number of times to retry failed requests
retry_delay=1.0, # Delay in seconds between retries
)How do I connect locally for development?
Connect to a local Control Plane:
from cmdop import AsyncCMDOPClient
# Connect to a local Control Plane running on your development machine
async with AsyncCMDOPClient.local(
host="localhost",
port=50051
) as client:
# Run a command against the locally connected server
output, code = await client.terminal.execute("uptime")What local configuration options are available?
# Configure the local client with TLS disabled for development
client = AsyncCMDOPClient.local(
host="localhost",
port=50051,
tls=False, # Disable TLS for local dev (not for production)
timeout=30, # Request timeout in seconds
)How do I use the context manager?
Always use async with for proper cleanup:
# Recommended: async with handles connection setup and teardown automatically
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
await client.terminal.execute("uptime")
# Connection is closed automatically when exiting the context manager
# Manual management β use try/finally to ensure cleanup if you cannot use async with
client = AsyncCMDOPClient.remote(api_key="cmd_xxx")
await client.connect()
try:
await client.terminal.execute("uptime")
finally:
# Always close the connection to release resources
await client.close()How do I select a target machine?
Set the target machine before operations:
async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client:
# Set the target machine by hostname β all subsequent operations route here
await client.terminal.set_machine("prod-server")
# Execute a command on the selected machine
output, code = await client.terminal.execute("uptime")
# List files on the same machine
files = await client.files.list("/var/log")
# Switch to a different machine mid-session
await client.terminal.set_machine("staging-server")
output, code = await client.terminal.execute("uptime")Machine by Hostname
# Select a machine using its hostname
await client.terminal.set_machine("my-server")Machine by ID
# Select a machine using its unique machine ID
await client.terminal.set_machine(machine_id="mach_abc123")How do I manage sessions?
Get Active Session
# Retrieve the currently active terminal session for a specific machine
session = await client.terminal.get_active_session("my-server")
print(f"Session ID: {session.session_id}") # Unique identifier for this session
print(f"Status: {session.status}") # e.g., "active", "idle", "disconnected"List Sessions
# List all terminal sessions across all machines in the current workspace
sessions = await client.terminal.list_sessions()
for session in sessions:
# Print each machine's hostname and its session status
print(f"{session.machine_hostname}: {session.status}")Attach to Session
# Attach to an existing session to receive real-time terminal output
stream = client.terminal.stream()
await stream.attach(session.session_id) # Connect to the session by its IDHow do I authenticate?
API Key
# Option 1: Pass the API key directly as a string argument
client = AsyncCMDOPClient.remote(api_key="cmd_xxx")
# Option 2: Read the API key from an environment variable (recommended for production)
import os
client = AsyncCMDOPClient.remote(api_key=os.environ["CMDOP_API_KEY"])How does token refresh work?
API keys donβt expire. For OAuth tokens:
# Enable automatic token refresh for OAuth tokens (API keys do not expire)
client = AsyncCMDOPClient.remote(
api_key="cmd_xxx",
auto_refresh=True # SDK will refresh the OAuth token before it expires
)How do I handle connection events?
# Create the client instance (not yet connected)
client = AsyncCMDOPClient.remote(api_key="cmd_xxx")
# Register a handler that fires when the connection is established
@client.on_connect
async def handle_connect():
print("Connected")
# Register a handler that fires when the connection is lost
@client.on_disconnect
async def handle_disconnect():
print("Disconnected")
# Register a handler that fires on connection errors
@client.on_error
async def handle_error(error):
print(f"Error: {error}")
# Manually open the connection (triggers on_connect if successful)
await client.connect()How does automatic reconnection work?
The SDK handles reconnection automatically:
# Configure automatic reconnection with exponential backoff
client = AsyncCMDOPClient.remote(
api_key="cmd_xxx",
auto_reconnect=True, # Enable auto-reconnect on connection loss
reconnect_delay=1.0, # Initial delay in seconds before first retry
reconnect_max_delay=30.0, # Maximum delay between retries (exponential backoff caps here)
reconnect_max_attempts=10 # Stop retrying after 10 failed attempts
)How do I connect to multiple workspaces?
# Create separate clients for each workspace, each with its own API key
acme_client = AsyncCMDOPClient.remote(api_key="cmd_acme_xxx")
globex_client = AsyncCMDOPClient.remote(api_key="cmd_globex_yyy")
# Use multiple context managers to connect to both workspaces simultaneously
async with acme_client, globex_client:
# Target and execute on the Acme workspace
await acme_client.terminal.set_machine("acme-server")
await acme_client.terminal.execute("uptime")
# Target and execute on the Globex workspace
await globex_client.terminal.set_machine("globex-server")
await globex_client.terminal.execute("uptime")How do I configure timeouts?
from cmdop.exceptions import TimeoutError
# Set a global timeout of 30 seconds for all requests
client = AsyncCMDOPClient.remote(
api_key="cmd_xxx",
timeout=30 # Global timeout in seconds applies to every request
)
# Override the global timeout for a single long-running operation
try:
output, code = await client.terminal.execute(
"long-running-script.sh",
timeout=300 # Allow 5 minutes (300s) for this specific command
)
except TimeoutError:
# Raised when the operation exceeds the specified timeout
print("Operation timed out")How do I enable debug mode?
import logging
# Set Python logging to DEBUG level to see all SDK internal messages
logging.basicConfig(level=logging.DEBUG)
# Enable the SDK's built-in debug mode for additional diagnostic output
client = AsyncCMDOPClient.remote(
api_key="cmd_xxx",
debug=True # Logs request/response details, connection state changes, etc.
)How do I check connection status?
# Check the boolean connection state before running commands
if client.is_connected:
await client.terminal.execute("uptime")
else:
# Re-establish the connection if it was lost
await client.connect()
# Inspect detailed connection metadata
info = client.connection_info
print(f"Server: {info.server}") # The server URL or address
print(f"Latency: {info.latency_ms}ms") # Round-trip latency in millisecondsNext
- Terminal Service β Execute commands
- Files Service β File operations
Last updated on