Skip to Content

Connection

TL;DR

Connect to Cmdop via AsyncCMDOPClient.remote(api_key=...) for cloud or AsyncCMDOPClient.local() for development. Always use async with for automatic cleanup. Set target machines with client.terminal.set_machine("hostname"). The SDK handles reconnection automatically with configurable exponential backoff and supports connection events, debug mode, and multiple concurrent workspace connections.

The SDK supports multiple connection modes for different use cases.

How do I connect to the cloud?

Connect to CMDOP’s cloud Control Plane:

from cmdop import AsyncCMDOPClient # Connect to the cloud Control Plane using your API key async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: # Execute a command on the connected machine; returns output string and exit code output, code = await client.terminal.execute("uptime")

What configuration options are available?

# Configure the remote client with custom timeout and retry settings client = AsyncCMDOPClient.remote( api_key="cmd_xxx", timeout=30, # Default request timeout in seconds max_retries=3, # Number of times to retry failed requests retry_delay=1.0, # Delay in seconds between retries )

How do I connect locally for development?

Connect to a local Control Plane:

from cmdop import AsyncCMDOPClient # Connect to a local Control Plane running on your development machine async with AsyncCMDOPClient.local( host="localhost", port=50051 ) as client: # Run a command against the locally connected server output, code = await client.terminal.execute("uptime")

What local configuration options are available?

# Configure the local client with TLS disabled for development client = AsyncCMDOPClient.local( host="localhost", port=50051, tls=False, # Disable TLS for local dev (not for production) timeout=30, # Request timeout in seconds )

How do I use the context manager?

Always use async with for proper cleanup:

# Recommended: async with handles connection setup and teardown automatically async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: await client.terminal.execute("uptime") # Connection is closed automatically when exiting the context manager # Manual management β€” use try/finally to ensure cleanup if you cannot use async with client = AsyncCMDOPClient.remote(api_key="cmd_xxx") await client.connect() try: await client.terminal.execute("uptime") finally: # Always close the connection to release resources await client.close()

How do I select a target machine?

Set the target machine before operations:

async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: # Set the target machine by hostname β€” all subsequent operations route here await client.terminal.set_machine("prod-server") # Execute a command on the selected machine output, code = await client.terminal.execute("uptime") # List files on the same machine files = await client.files.list("/var/log") # Switch to a different machine mid-session await client.terminal.set_machine("staging-server") output, code = await client.terminal.execute("uptime")

Machine by Hostname

# Select a machine using its hostname await client.terminal.set_machine("my-server")

Machine by ID

# Select a machine using its unique machine ID await client.terminal.set_machine(machine_id="mach_abc123")

How do I manage sessions?

Get Active Session

# Retrieve the currently active terminal session for a specific machine session = await client.terminal.get_active_session("my-server") print(f"Session ID: {session.session_id}") # Unique identifier for this session print(f"Status: {session.status}") # e.g., "active", "idle", "disconnected"

List Sessions

# List all terminal sessions across all machines in the current workspace sessions = await client.terminal.list_sessions() for session in sessions: # Print each machine's hostname and its session status print(f"{session.machine_hostname}: {session.status}")

Attach to Session

# Attach to an existing session to receive real-time terminal output stream = client.terminal.stream() await stream.attach(session.session_id) # Connect to the session by its ID

How do I authenticate?

API Key

# Option 1: Pass the API key directly as a string argument client = AsyncCMDOPClient.remote(api_key="cmd_xxx") # Option 2: Read the API key from an environment variable (recommended for production) import os client = AsyncCMDOPClient.remote(api_key=os.environ["CMDOP_API_KEY"])

How does token refresh work?

API keys don’t expire. For OAuth tokens:

# Enable automatic token refresh for OAuth tokens (API keys do not expire) client = AsyncCMDOPClient.remote( api_key="cmd_xxx", auto_refresh=True # SDK will refresh the OAuth token before it expires )

How do I handle connection events?

# Create the client instance (not yet connected) client = AsyncCMDOPClient.remote(api_key="cmd_xxx") # Register a handler that fires when the connection is established @client.on_connect async def handle_connect(): print("Connected") # Register a handler that fires when the connection is lost @client.on_disconnect async def handle_disconnect(): print("Disconnected") # Register a handler that fires on connection errors @client.on_error async def handle_error(error): print(f"Error: {error}") # Manually open the connection (triggers on_connect if successful) await client.connect()

How does automatic reconnection work?

The SDK handles reconnection automatically:

# Configure automatic reconnection with exponential backoff client = AsyncCMDOPClient.remote( api_key="cmd_xxx", auto_reconnect=True, # Enable auto-reconnect on connection loss reconnect_delay=1.0, # Initial delay in seconds before first retry reconnect_max_delay=30.0, # Maximum delay between retries (exponential backoff caps here) reconnect_max_attempts=10 # Stop retrying after 10 failed attempts )

How do I connect to multiple workspaces?

# Create separate clients for each workspace, each with its own API key acme_client = AsyncCMDOPClient.remote(api_key="cmd_acme_xxx") globex_client = AsyncCMDOPClient.remote(api_key="cmd_globex_yyy") # Use multiple context managers to connect to both workspaces simultaneously async with acme_client, globex_client: # Target and execute on the Acme workspace await acme_client.terminal.set_machine("acme-server") await acme_client.terminal.execute("uptime") # Target and execute on the Globex workspace await globex_client.terminal.set_machine("globex-server") await globex_client.terminal.execute("uptime")

How do I configure timeouts?

from cmdop.exceptions import TimeoutError # Set a global timeout of 30 seconds for all requests client = AsyncCMDOPClient.remote( api_key="cmd_xxx", timeout=30 # Global timeout in seconds applies to every request ) # Override the global timeout for a single long-running operation try: output, code = await client.terminal.execute( "long-running-script.sh", timeout=300 # Allow 5 minutes (300s) for this specific command ) except TimeoutError: # Raised when the operation exceeds the specified timeout print("Operation timed out")

How do I enable debug mode?

import logging # Set Python logging to DEBUG level to see all SDK internal messages logging.basicConfig(level=logging.DEBUG) # Enable the SDK's built-in debug mode for additional diagnostic output client = AsyncCMDOPClient.remote( api_key="cmd_xxx", debug=True # Logs request/response details, connection state changes, etc. )

How do I check connection status?

# Check the boolean connection state before running commands if client.is_connected: await client.terminal.execute("uptime") else: # Re-establish the connection if it was lost await client.connect() # Inspect detailed connection metadata info = client.connection_info print(f"Server: {info.server}") # The server URL or address print(f"Latency: {info.latency_ms}ms") # Round-trip latency in milliseconds

Next

Last updated on