unsandboxunsandbox.com

Anonymous remote code, compile, & execution API for humans & machine learning agents.

Docs 📚 View Pricing →
Python
UN CLI
un.py 41.1 KB · 993 lines

Usage

# Run this implementation to execute a Python script
python3 cli/inception/un.py test/fib.py

Integration Quickstart ⚡

Add unsandbox superpowers to your existing Python app:

1
Download
curl -O https://git.unturf.com/engineering/unturf/un-inception/-/raw/main/un.py
2
Set API Keys
export UNSANDBOX_PUBLIC_KEY="unsb-pk-xxxx-xxxx-xxxx-xxxx"
export UNSANDBOX_SECRET_KEY="unsb-sk-xxxx-xxxx-xxxx-xxxx"
3
Hello World
# In your Python app:
from un import execute_code

result = execute_code("python", "print('Hello from unsandbox!')")
print(result["stdout"])  # Hello from unsandbox!
What you can do
execute_code(lang, code) Run code in 42+ languages
create_session() Interactive shells & REPLs
create_service() Deploy persistent HTTPS apps
snapshot_session() Save & restore container state

Source Code 📄

# PUBLIC DOMAIN - NO LICENSE, NO WARRANTY
#
# This is free public domain software for the public good of a permacomputer hosted
# at permacomputer.com - an always-on computer by the people, for the people. One
# which is durable, easy to repair, and distributed like tap water for machine
# learning intelligence.
#
# The permacomputer is community-owned infrastructure optimized around four values:
#
#   TRUTH    - First principles, math & science, open source code freely distributed
#   FREEDOM  - Voluntary partnerships, freedom from tyranny & corporate control
#   HARMONY  - Minimal waste, self-renewing systems with diverse thriving connections
#   LOVE     - Be yourself without hurting others, cooperation through natural law
#
# This software contributes to that vision by enabling code execution across 42+
# programming languages through a unified interface, accessible to all. Code is
# seeds to sprout on any abandoned technology.
#
# Learn more: https://www.permacomputer.com
#
# Anyone is free to copy, modify, publish, use, compile, sell, or distribute this
# software, either in source code form or as a compiled binary, for any purpose,
# commercial or non-commercial, and by any means.
#
# NO WARRANTY. THE SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
#
# That said, our permacomputer's digital membrane stratum continuously runs unit,
# integration, and functional tests on all of it's own software - with our
# permacomputer monitoring itself, repairing itself, with minimal human in the
# loop guidance. Our agents do their best.
#
# Copyright 2025 TimeHexOn & foxhop & russell@unturf
# https://www.timehexon.com
# https://www.foxhop.net
# https://www.unturf.com/software

#!/usr/bin/env python3
"""
un.py - Unsandbox CLI Client (Python Implementation)

Full-featured CLI matching un.c capabilities:
- Execute code with env vars, input files, artifacts
- Interactive sessions with shell/REPL support
- Persistent services with domains and ports

Usage:
  un.py [options] <source_file>
  un.py session [options]
  un.py service [options]

Requires: UNSANDBOX_API_KEY environment variable
"""

import sys
import os
import json
import base64
import argparse
import urllib.request
import urllib.error
import webbrowser
import hmac
import hashlib
import time

API_BASE = "https://api.unsandbox.com"
PORTAL_BASE = "https://unsandbox.com"
BLUE = "\033[34m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RESET = "\033[0m"

# Extension to language mapping
EXT_MAP = {
    ".py": "python", ".js": "javascript", ".ts": "typescript",
    ".rb": "ruby", ".php": "php", ".pl": "perl", ".lua": "lua",
    ".sh": "bash", ".go": "go", ".rs": "rust", ".c": "c",
    ".cpp": "cpp", ".cc": "cpp", ".cxx": "cpp",
    ".java": "java", ".kt": "kotlin", ".cs": "csharp", ".fs": "fsharp",
    ".hs": "haskell", ".ml": "ocaml", ".clj": "clojure", ".scm": "scheme",
    ".lisp": "commonlisp", ".erl": "erlang", ".ex": "elixir", ".exs": "elixir",
    ".jl": "julia", ".r": "r", ".R": "r", ".cr": "crystal",
    ".d": "d", ".nim": "nim", ".zig": "zig", ".v": "v",
    ".dart": "dart", ".groovy": "groovy", ".scala": "scala",
    ".f90": "fortran", ".f95": "fortran", ".cob": "cobol",
    ".pro": "prolog", ".forth": "forth", ".4th": "forth",
    ".tcl": "tcl", ".raku": "raku", ".m": "objc",
}


def get_api_keys(args_key=None):
    """Get API keys from args or environment. Returns (public_key, secret_key)."""
    # Try new split key format first
    public_key = os.environ.get("UNSANDBOX_PUBLIC_KEY")
    secret_key = os.environ.get("UNSANDBOX_SECRET_KEY")

    # Fall back to old single key format for backwards compatibility
    if not public_key or not secret_key:
        old_key = args_key or os.environ.get("UNSANDBOX_API_KEY")
        if old_key:
            # Old format: use the key as secret, derive public from it or use as-is
            public_key = old_key
            secret_key = old_key
        else:
            print(f"{RED}Error: UNSANDBOX_PUBLIC_KEY and UNSANDBOX_SECRET_KEY not set{RESET}", file=sys.stderr)
            print(f"{RED}       (or legacy UNSANDBOX_API_KEY for backwards compatibility){RESET}", file=sys.stderr)
            sys.exit(1)

    return public_key, secret_key


def detect_language(filename, exit_on_error=True):
    """Detect language from file extension"""
    ext = os.path.splitext(filename)[1].lower()
    lang = EXT_MAP.get(ext)
    if not lang:
        # Try reading shebang
        try:
            with open(filename, 'r') as f:
                first_line = f.readline()
                if first_line.startswith('#!'):
                    if 'python' in first_line: return 'python'
                    if 'node' in first_line: return 'javascript'
                    if 'ruby' in first_line: return 'ruby'
                    if 'perl' in first_line: return 'perl'
                    if 'bash' in first_line or '/sh' in first_line: return 'bash'
                    if 'lua' in first_line: return 'lua'
                    if 'php' in first_line: return 'php'
        except:
            pass
        if exit_on_error:
            print(f"{RED}Error: Cannot detect language for {filename}{RESET}", file=sys.stderr)
            sys.exit(1)
        return None
    return lang


def read_file(filepath):
    """Read file contents - helper for tests"""
    with open(filepath, 'r') as f:
        return f.read()


def execute_code(language, code, public_key=None, secret_key=None):
    """Execute code and return result - helper for tests"""
    if not public_key:
        public_key, secret_key = get_api_keys()
    return api_request("/execute", method="POST", data={"language": language, "code": code}, public_key=public_key, secret_key=secret_key)


def api_request(endpoint, method="GET", data=None, public_key=None, secret_key=None):
    """Make API request with HMAC authentication"""
    url = f"{API_BASE}{endpoint}"

    # Prepare body
    body = json.dumps(data) if data else ""

    # Generate HMAC signature
    timestamp = str(int(time.time()))
    signature_input = f"{timestamp}:{method}:{endpoint}:{body}"
    signature = hmac.new(
        secret_key.encode('utf-8'),
        signature_input.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    headers = {
        "Authorization": f"Bearer {public_key}",
        "X-Timestamp": timestamp,
        "X-Signature": signature,
        "Content-Type": "application/json"
    }

    req = urllib.request.Request(url, method=method, headers=headers)
    if data:
        req.data = body.encode('utf-8')

    try:
        with urllib.request.urlopen(req, timeout=300) as resp:
            return json.loads(resp.read().decode('utf-8'))
    except urllib.error.HTTPError as e:
        error_body = e.read().decode('utf-8') if e.fp else str(e)
        if e.code == 401 and 'timestamp' in error_body.lower():
            print(f"{RED}Error: Request timestamp expired (must be within 5 minutes of server time){RESET}", file=sys.stderr)
            print(f"{YELLOW}Your computer's clock may have drifted.{RESET}", file=sys.stderr)
            print("Check your system time and sync with NTP if needed:", file=sys.stderr)
            print("  Linux:   sudo ntpdate -s time.nist.gov", file=sys.stderr)
            print("  macOS:   sudo sntp -sS time.apple.com", file=sys.stderr)
            print("  Windows: w32tm /resync", file=sys.stderr)
        else:
            print(f"{RED}Error: HTTP {e.code} - {error_body}{RESET}", file=sys.stderr)
        sys.exit(1)
    except urllib.error.URLError as e:
        print(f"{RED}Error: {e.reason}{RESET}", file=sys.stderr)
        sys.exit(1)


def api_request_text(endpoint, method="PUT", body="", public_key=None, secret_key=None):
    """Make API request with text/plain body and HMAC authentication"""
    url = f"{API_BASE}{endpoint}"

    # Generate HMAC signature
    timestamp = str(int(time.time()))
    signature_input = f"{timestamp}:{method}:{endpoint}:{body}"
    signature = hmac.new(
        secret_key.encode('utf-8'),
        signature_input.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    headers = {
        "Authorization": f"Bearer {public_key}",
        "X-Timestamp": timestamp,
        "X-Signature": signature,
        "Content-Type": "text/plain"
    }

    req = urllib.request.Request(url, method=method, headers=headers)
    if body:
        req.data = body.encode('utf-8')

    try:
        with urllib.request.urlopen(req, timeout=300) as resp:
            return json.loads(resp.read().decode('utf-8'))
    except urllib.error.HTTPError as e:
        error_body = e.read().decode('utf-8') if e.fp else str(e)
        print(f"{RED}Error: HTTP {e.code} - {error_body}{RESET}", file=sys.stderr)
        return None
    except urllib.error.URLError as e:
        print(f"{RED}Error: {e.reason}{RESET}", file=sys.stderr)
        return None


# ============================================================================
# Environment Secrets Vault Functions
# ============================================================================

MAX_ENV_CONTENT_SIZE = 64 * 1024  # 64KB max env vault size


def service_env_status(public_key, secret_key, service_id):
    """Get environment vault status for a service"""
    result = api_request(f"/services/{service_id}/env", public_key=public_key, secret_key=secret_key)

    has_vault = result.get("has_vault", False)
    if not has_vault:
        print("Vault exists: no")
        print("Variable count: 0")
    else:
        print("Vault exists: yes")
        count = result.get("count", 0)
        print(f"Variable count: {count}")
        updated_at = result.get("updated_at")
        if updated_at:
            from datetime import datetime
            dt = datetime.fromtimestamp(updated_at)
            print(f"Last updated: {dt.strftime('%Y-%m-%d %H:%M:%S')}")


def service_env_set(public_key, secret_key, service_id, env_content):
    """Set environment vault for a service (PUT /services/:id/env)"""
    if not env_content or len(env_content) == 0:
        print(f"{RED}Error: No environment content provided{RESET}", file=sys.stderr)
        return False

    if len(env_content) > MAX_ENV_CONTENT_SIZE:
        print(f"{RED}Error: Environment content too large (max {MAX_ENV_CONTENT_SIZE} bytes){RESET}", file=sys.stderr)
        return False

    result = api_request_text(f"/services/{service_id}/env", method="PUT", body=env_content,
                               public_key=public_key, secret_key=secret_key)
    if result is None:
        return False

    count = result.get("count", -1)
    if count >= 0:
        print(f"{GREEN}Environment vault updated: {count} variable{'s' if count != 1 else ''}{RESET}")
    else:
        print(f"{GREEN}Environment vault updated{RESET}")

    message = result.get("message")
    if message:
        print(message)

    return True


def service_env_export(public_key, secret_key, service_id):
    """Export environment vault for a service (POST /services/:id/env/export)"""
    result = api_request(f"/services/{service_id}/env/export", method="POST", data={},
                          public_key=public_key, secret_key=secret_key)

    env_content = result.get("env", "")
    if env_content:
        print(env_content, end='')
        if not env_content.endswith('\n'):
            print()


def service_env_delete(public_key, secret_key, service_id):
    """Delete environment vault for a service (DELETE /services/:id/env)"""
    result = api_request(f"/services/{service_id}/env", method="DELETE",
                          public_key=public_key, secret_key=secret_key)

    print(f"{GREEN}Environment vault deleted{RESET}")
    message = result.get("message")
    if message:
        print(message)


def read_env_file(filepath):
    """Read .env file contents"""
    try:
        with open(filepath, 'r') as f:
            return f.read()
    except FileNotFoundError:
        print(f"{RED}Error: Env file not found: {filepath}{RESET}", file=sys.stderr)
        sys.exit(1)
    except IOError as e:
        print(f"{RED}Error reading env file: {e}{RESET}", file=sys.stderr)
        sys.exit(1)


def build_env_content(env_vars, env_file=None):
    """Build .env format content from -e flags and/or --env-file"""
    content_parts = []

    # Read from env file first
    if env_file:
        content_parts.append(read_env_file(env_file))

    # Add -e flags (these override/append to file)
    if env_vars:
        for e in env_vars:
            if '=' in e:
                content_parts.append(e)

    return '\n'.join(content_parts) if content_parts else None


def cmd_execute(args):
    """Execute source code"""
    public_key, secret_key = get_api_keys(args.api_key)

    # Check for inline mode: -s/--shell specified, or source_file doesn't exist
    inline_mode = False
    if args.exec_shell:
        inline_mode = True
        language = args.exec_shell
        code = args.source_file  # The "file" argument is actually the code
    elif not os.path.exists(args.source_file):
        # File doesn't exist - treat as inline bash code
        inline_mode = True
        language = "bash"
        code = args.source_file

    if not inline_mode:
        # Read source file
        try:
            with open(args.source_file, 'r') as f:
                code = f.read()
        except FileNotFoundError:
            print(f"{RED}Error: File not found: {args.source_file}{RESET}", file=sys.stderr)
            sys.exit(1)

        language = detect_language(args.source_file)

    # Build request payload
    payload = {
        "language": language,
        "code": code
    }

    # Add environment variables
    if args.env:
        env_vars = {}
        for e in args.env:
            if '=' in e:
                k, v = e.split('=', 1)
                env_vars[k] = v
        if env_vars:
            payload["env"] = env_vars

    # Add input files
    if args.files:
        input_files = []
        for filepath in args.files:
            try:
                with open(filepath, 'rb') as f:
                    content = base64.b64encode(f.read()).decode('utf-8')
                input_files.append({
                    "filename": os.path.basename(filepath),
                    "content_base64": content
                })
            except FileNotFoundError:
                print(f"{RED}Error: Input file not found: {filepath}{RESET}", file=sys.stderr)
                sys.exit(1)
        if input_files:
            payload["input_files"] = input_files

    # Add options
    if args.artifacts:
        payload["return_artifacts"] = True
    if args.network:
        payload["network"] = args.network
    if args.vcpu:
        payload["vcpu"] = args.vcpu

    # Execute
    result = api_request("/execute", method="POST", data=payload, public_key=public_key, secret_key=secret_key)

    # Print output
    if result.get("stdout"):
        print(f"{BLUE}{result['stdout']}{RESET}", end='')
    if result.get("stderr"):
        print(f"{RED}{result['stderr']}{RESET}", end='', file=sys.stderr)

    # Save artifacts
    if args.artifacts and result.get("artifacts"):
        out_dir = args.output_dir or "."
        os.makedirs(out_dir, exist_ok=True)
        for artifact in result["artifacts"]:
            filename = artifact.get("filename", "artifact")
            content = base64.b64decode(artifact["content_base64"])
            path = os.path.join(out_dir, filename)
            with open(path, 'wb') as f:
                f.write(content)
            os.chmod(path, 0o755)
            print(f"{GREEN}Saved: {path}{RESET}", file=sys.stderr)

    sys.exit(result.get("exit_code", 0))


def cmd_session(args):
    """Manage interactive sessions"""
    public_key, secret_key = get_api_keys(args.api_key)

    if args.list:
        result = api_request("/sessions", public_key=public_key, secret_key=secret_key)
        sessions = result.get("sessions", [])
        if not sessions:
            print("No active sessions")
        else:
            print(f"{'ID':<40} {'Shell':<10} {'Status':<10} {'Created'}")
            for s in sessions:
                print(f"{s.get('id', 'N/A'):<40} {s.get('shell', 'N/A'):<10} {s.get('status', 'N/A'):<10} {s.get('created_at', 'N/A')}")
        return

    if args.kill:
        result = api_request(f"/sessions/{args.kill}", method="DELETE", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Session terminated: {args.kill}{RESET}")
        return

    if args.snapshot:
        payload = {}
        if args.snapshot_name:
            payload["name"] = args.snapshot_name
        if args.hot:
            payload["hot"] = True

        print(f"{YELLOW}Creating snapshot of session {args.snapshot}...{RESET}", file=sys.stderr)
        result = api_request(f"/sessions/{args.snapshot}/snapshot", method="POST", data=payload, public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Snapshot created successfully{RESET}")
        print(f"Snapshot ID: {result.get('id', 'N/A')}")
        return

    if args.restore:
        # --restore takes snapshot ID directly, calls /snapshots/:id/restore
        print(f"{YELLOW}Restoring from snapshot {args.restore}...{RESET}", file=sys.stderr)
        result = api_request(f"/snapshots/{args.restore}/restore", method="POST", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Session restored from snapshot{RESET}")
        if result.get('session_id'):
            print(f"New session ID: {result.get('session_id')}")
        return

    if args.attach:
        print(f"{YELLOW}Attaching to session {args.attach}...{RESET}")
        print(f"{YELLOW}(Interactive sessions require WebSocket - use un2 for full support){RESET}")
        return

    # Create new session
    payload = {
        "shell": args.shell or "bash"
    }
    if args.network:
        payload["network"] = args.network
    if args.vcpu:
        payload["vcpu"] = args.vcpu
    if args.tmux:
        payload["persistence"] = "tmux"
    if args.screen:
        payload["persistence"] = "screen"
    if args.audit:
        payload["audit"] = True

    # Add input files
    if args.files:
        input_files = []
        for filepath in args.files:
            try:
                with open(filepath, 'rb') as f:
                    content = base64.b64encode(f.read()).decode('utf-8')
                input_files.append({
                    "filename": os.path.basename(filepath),
                    "content_base64": content
                })
            except FileNotFoundError:
                print(f"{RED}Error: Input file not found: {filepath}{RESET}", file=sys.stderr)
                sys.exit(1)
        if input_files:
            payload["input_files"] = input_files

    print(f"{YELLOW}Creating session...{RESET}")
    result = api_request("/sessions", method="POST", data=payload, public_key=public_key, secret_key=secret_key)
    print(f"{GREEN}Session created: {result.get('id', 'N/A')}{RESET}")
    print(f"{YELLOW}(Interactive sessions require WebSocket - use un2 for full support){RESET}")


def validate_key(public_key, secret_key, extend=False):
    """Validate API key and display information"""
    url = f"{PORTAL_BASE}/keys/validate"

    # Generate HMAC signature for portal request
    timestamp = str(int(time.time()))
    endpoint = "/keys/validate"
    body = ""
    signature_input = f"{timestamp}:POST:{endpoint}:{body}"
    signature = hmac.new(
        secret_key.encode('utf-8'),
        signature_input.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    headers = {
        "Authorization": f"Bearer {public_key}",
        "X-Timestamp": timestamp,
        "X-Signature": signature,
        "Content-Type": "application/json"
    }

    req = urllib.request.Request(url, method="POST", headers=headers)

    try:
        with urllib.request.urlopen(req, timeout=30) as resp:
            result = json.loads(resp.read().decode('utf-8'))

            # Handle --extend flag
            if extend:
                public_key = result.get("public_key")
                if public_key:
                    extend_url = f"{PORTAL_BASE}/keys/extend?pk={public_key}"
                    print(f"{BLUE}Opening browser to extend key...{RESET}")
                    webbrowser.open(extend_url)
                    return
                else:
                    print(f"{RED}Error: Could not retrieve public key{RESET}", file=sys.stderr)
                    sys.exit(1)

            # Check if key is expired
            if result.get("expired", False):
                print(f"{RED}Expired{RESET}")
                print(f"Public Key: {result.get('public_key', 'N/A')}")
                print(f"Tier: {result.get('tier', 'N/A')}")
                print(f"Expired: {result.get('expires_at', 'N/A')}")
                print(f"{YELLOW}To renew: Visit https://unsandbox.com/keys/extend{RESET}")
                sys.exit(1)

            # Valid key
            print(f"{GREEN}Valid{RESET}")
            print(f"Public Key: {result.get('public_key', 'N/A')}")
            print(f"Tier: {result.get('tier', 'N/A')}")
            print(f"Status: {result.get('status', 'N/A')}")
            print(f"Expires: {result.get('expires_at', 'N/A')}")
            print(f"Time Remaining: {result.get('time_remaining', 'N/A')}")
            print(f"Rate Limit: {result.get('rate_limit', 'N/A')}")
            print(f"Burst: {result.get('burst', 'N/A')}")
            print(f"Concurrency: {result.get('concurrency', 'N/A')}")

    except urllib.error.HTTPError as e:
        error_body = e.read().decode('utf-8') if e.fp else str(e)
        try:
            error_json = json.loads(error_body)
            reason = error_json.get("error", error_body)
        except:
            reason = error_body
        print(f"{RED}Invalid{RESET}")
        print(f"Reason: {reason}")
        sys.exit(1)
    except urllib.error.URLError as e:
        print(f"{RED}Error: {e.reason}{RESET}", file=sys.stderr)
        sys.exit(1)


def cmd_key(args):
    """Validate API key"""
    public_key, secret_key = get_api_keys(args.key)
    validate_key(public_key, secret_key, extend=args.extend)


def cmd_snapshot(args):
    """Manage snapshots"""
    public_key, secret_key = get_api_keys(args.api_key)

    if args.list:
        result = api_request("/snapshots", public_key=public_key, secret_key=secret_key)
        snapshots = result.get("snapshots", [])
        if not snapshots:
            print("No snapshots found")
        else:
            print(f"{'ID':<40} {'Name':<20} {'Type':<12} {'Source ID':<30} {'Size':<10}")
            for s in snapshots:
                print(f"{s.get('id', 'N/A'):<40} {s.get('name', '-'):<20} {s.get('source_type', 'N/A'):<12} {s.get('source_id', 'N/A'):<30} {s.get('size', 'N/A'):<10}")
        return

    if args.info:
        result = api_request(f"/snapshots/{args.info}", public_key=public_key, secret_key=secret_key)
        print(f"{BLUE}Snapshot Details{RESET}\n")
        print(f"Snapshot ID: {result.get('id', 'N/A')}")
        print(f"Name: {result.get('name', '-')}")
        print(f"Source Type: {result.get('source_type', 'N/A')}")
        print(f"Source ID: {result.get('source_id', 'N/A')}")
        print(f"Size: {result.get('size', 'N/A')}")
        print(f"Created: {result.get('created_at', 'N/A')}")
        print(f"Hot Snapshot: {result.get('hot', 'N/A')}")
        return

    if args.delete:
        result = api_request(f"/snapshots/{args.delete}", method="DELETE", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Snapshot deleted successfully{RESET}")
        return

    if args.clone:
        if not args.type:
            print(f"{RED}Error: --type required for --clone (session or service){RESET}", file=sys.stderr)
            sys.exit(1)
        if args.type not in ["session", "service"]:
            print(f"{RED}Error: --type must be 'session' or 'service'{RESET}", file=sys.stderr)
            sys.exit(1)

        payload = {"type": args.type}
        if args.name:
            payload["name"] = args.name
        if args.shell:
            payload["shell"] = args.shell
        if args.ports:
            payload["ports"] = [int(p) for p in args.ports.split(',')]

        result = api_request(f"/snapshots/{args.clone}/clone", method="POST", data=payload, public_key=public_key, secret_key=secret_key)

        if args.type == "session":
            print(f"{GREEN}Session created from snapshot{RESET}")
            print(f"Session ID: {result.get('id', 'N/A')}")
        else:
            print(f"{GREEN}Service created from snapshot{RESET}")
            print(f"Service ID: {result.get('id', 'N/A')}")
        return

    print(f"{RED}Error: Specify --list, --info ID, --delete ID, or --clone ID --type TYPE{RESET}", file=sys.stderr)
    sys.exit(1)


def cmd_service(args):
    """Manage persistent services"""
    public_key, secret_key = get_api_keys(args.api_key)

    # Handle env subcommand: un.py service env <action> <id>
    if getattr(args, 'subcommand', None) == "env":
        action = getattr(args, 'env_action', None)
        target = getattr(args, 'env_target', None)

        if not action:
            print(f"{RED}Error: env action required (status, set, export, delete){RESET}", file=sys.stderr)
            sys.exit(1)
        if not target:
            print(f"{RED}Error: Service ID required for env command{RESET}", file=sys.stderr)
            sys.exit(1)

        if action == "status":
            service_env_status(public_key, secret_key, target)
            return
        elif action == "set":
            env_content = build_env_content(args.env, args.env_file)
            if not env_content:
                # Try reading from stdin
                import select
                if select.select([sys.stdin], [], [], 0.0)[0]:
                    env_content = sys.stdin.read()
            if not env_content:
                print(f"{RED}Error: No env content provided. Use -e KEY=VAL, --env-file, or pipe to stdin{RESET}", file=sys.stderr)
                sys.exit(1)
            service_env_set(public_key, secret_key, target, env_content)
            return
        elif action == "export":
            service_env_export(public_key, secret_key, target)
            return
        elif action == "delete":
            service_env_delete(public_key, secret_key, target)
            return
        else:
            print(f"{RED}Error: Unknown env action '{action}'. Use: status, set, export, delete{RESET}", file=sys.stderr)
            sys.exit(1)

    if args.list:
        result = api_request("/services", public_key=public_key, secret_key=secret_key)
        services = result.get("services", [])
        if not services:
            print("No services")
        else:
            print(f"{'ID':<20} {'Name':<15} {'Status':<10} {'Ports':<15} {'Domains'}")
            for s in services:
                ports = ','.join(map(str, s.get('ports', [])))
                domains = ','.join(s.get('domains', []))
                print(f"{s.get('id', 'N/A'):<20} {s.get('name', 'N/A'):<15} {s.get('status', 'N/A'):<10} {ports:<15} {domains}")
        return

    if args.info:
        result = api_request(f"/services/{args.info}", public_key=public_key, secret_key=secret_key)
        print(json.dumps(result, indent=2))
        return

    if args.logs:
        result = api_request(f"/services/{args.logs}/logs", public_key=public_key, secret_key=secret_key)
        print(result.get("logs", ""))
        return

    if args.tail:
        result = api_request(f"/services/{args.tail}/logs?lines=9000", public_key=public_key, secret_key=secret_key)
        print(result.get("logs", ""))
        return

    if args.sleep:
        result = api_request(f"/services/{args.sleep}/sleep", method="POST", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Service sleeping: {args.sleep}{RESET}")
        return

    if args.wake:
        result = api_request(f"/services/{args.wake}/wake", method="POST", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Service waking: {args.wake}{RESET}")
        return

    if args.destroy:
        result = api_request(f"/services/{args.destroy}", method="DELETE", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Service destroyed: {args.destroy}{RESET}")
        return

    if args.resize:
        if not args.vcpu:
            print(f"{RED}Error: --vcpu required with --resize{RESET}", file=sys.stderr)
            sys.exit(1)
        payload = {"vcpu": args.vcpu}
        result = api_request(f"/services/{args.resize}", method="PATCH", data=payload, public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Service resized to {args.vcpu} vCPU, {args.vcpu * 2}GB RAM{RESET}")
        return

    if args.snapshot:
        payload = {}
        if args.snapshot_name:
            payload["name"] = args.snapshot_name
        if args.hot:
            payload["hot"] = True

        print(f"{YELLOW}Creating snapshot of service {args.snapshot}...{RESET}", file=sys.stderr)
        result = api_request(f"/services/{args.snapshot}/snapshot", method="POST", data=payload, public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Snapshot created successfully{RESET}")
        print(f"Snapshot ID: {result.get('id', 'N/A')}")
        return

    if args.restore:
        # --restore takes snapshot ID directly, calls /snapshots/:id/restore
        print(f"{YELLOW}Restoring from snapshot {args.restore}...{RESET}", file=sys.stderr)
        result = api_request(f"/snapshots/{args.restore}/restore", method="POST", public_key=public_key, secret_key=secret_key)
        print(f"{GREEN}Service restored from snapshot{RESET}")
        if result.get('service_id'):
            print(f"New service ID: {result.get('service_id')}")
        return

    if args.execute:
        payload = {"command": args.command}
        result = api_request(f"/services/{args.execute}/execute", method="POST", data=payload, public_key=public_key, secret_key=secret_key)
        if result.get("stdout"):
            print(f"{BLUE}{result['stdout']}{RESET}", end='')
        if result.get("stderr"):
            print(f"{RED}{result['stderr']}{RESET}", end='', file=sys.stderr)
        return

    if args.dump_bootstrap:
        print(f"Fetching bootstrap script from {args.dump_bootstrap}...", file=sys.stderr)
        payload = {"command": "cat /tmp/bootstrap.sh"}
        result = api_request(f"/services/{args.dump_bootstrap}/execute", method="POST", data=payload, public_key=public_key, secret_key=secret_key)

        if result.get("stdout"):
            bootstrap = result["stdout"]
            if args.dump_file:
                # Write to file
                try:
                    with open(args.dump_file, 'w') as f:
                        f.write(bootstrap)
                    os.chmod(args.dump_file, 0o755)
                    print(f"Bootstrap saved to {args.dump_file}")
                except IOError as e:
                    print(f"{RED}Error: Could not write to {args.dump_file}: {e}{RESET}", file=sys.stderr)
                    sys.exit(1)
            else:
                # Print to stdout
                print(bootstrap, end='')
        else:
            print(f"{RED}Error: Failed to fetch bootstrap (service not running or no bootstrap file){RESET}", file=sys.stderr)
            sys.exit(1)
        return

    # Create new service
    if args.name:
        payload = {"name": args.name}
        if args.ports:
            payload["ports"] = [int(p) for p in args.ports.split(',')]
        if args.domains:
            payload["domains"] = args.domains.split(',')
        if args.service_type:
            payload["service_type"] = args.service_type
        if args.bootstrap:
            payload["bootstrap"] = args.bootstrap
        if args.bootstrap_file:
            if not os.path.exists(args.bootstrap_file):
                print(f"{RED}Error: Bootstrap file not found: {args.bootstrap_file}{RESET}", file=sys.stderr)
                sys.exit(1)
            with open(args.bootstrap_file, 'r') as f:
                payload["bootstrap_content"] = f.read()
        if args.files:
            input_files = []
            for filepath in args.files:
                try:
                    with open(filepath, 'rb') as f:
                        content = base64.b64encode(f.read()).decode('utf-8')
                    input_files.append({
                        "filename": os.path.basename(filepath),
                        "content_base64": content
                    })
                except FileNotFoundError:
                    print(f"{RED}Error: Input file not found: {filepath}{RESET}", file=sys.stderr)
                    sys.exit(1)
            if input_files:
                payload["input_files"] = input_files
        if args.network:
            payload["network"] = args.network
        if args.vcpu:
            payload["vcpu"] = args.vcpu

        result = api_request("/services", method="POST", data=payload, public_key=public_key, secret_key=secret_key)
        created_id = result.get('id')
        print(f"{GREEN}Service created: {created_id or 'N/A'}{RESET}")
        print(f"Name: {result.get('name', 'N/A')}")
        if result.get('url'):
            print(f"URL: {result.get('url')}")

        # Set environment vault if -e or --env-file provided
        if created_id:
            env_content = build_env_content(args.env, args.env_file)
            if env_content:
                print(f"{YELLOW}Setting environment vault...{RESET}", file=sys.stderr)
                if not service_env_set(public_key, secret_key, created_id, env_content):
                    print(f"{YELLOW}Warning: Failed to set environment vault{RESET}", file=sys.stderr)
        return

    print(f"{RED}Error: Specify --name to create a service, or use --list, --info, etc.{RESET}", file=sys.stderr)
    sys.exit(1)


def main():
    parser = argparse.ArgumentParser(
        description="Unsandbox CLI - Execute code in secure sandboxes",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s script.py                    Execute Python script
  %(prog)s -e DEBUG=1 script.py         With environment variable
  %(prog)s -f data.csv process.py       With input file
  %(prog)s -a -o ./bin main.c           Save compiled artifacts
  %(prog)s session                      Interactive bash session
  %(prog)s session --shell python3      Python REPL
  %(prog)s session --list               List active sessions
  %(prog)s service --name web --ports 80 --bootstrap "python -m http.server"
  %(prog)s service --name app --ports 8000 --bootstrap-file ./setup.sh
  %(prog)s service --list               List all services
        """
    )

    # Common options
    parser.add_argument("-k", "--api-key", help="API key (or set UNSANDBOX_API_KEY)")
    parser.add_argument("-n", "--network", choices=["zerotrust", "semitrusted"], help="Network mode")
    parser.add_argument("-v", "--vcpu", type=int, choices=range(1, 9), help="vCPU count (1-8)")

    subparsers = parser.add_subparsers(dest="command")

    # Key subcommand
    key_parser = subparsers.add_parser("key", help="Validate API key")
    key_parser.add_argument("-k", "--key", help="API key to validate (or set UNSANDBOX_API_KEY)")
    key_parser.add_argument("--extend", action="store_true", help="Open browser to extend key")

    # Session subcommand
    session_parser = subparsers.add_parser("session", help="Interactive shell/REPL sessions")
    session_parser.add_argument("-s", "--shell", help="Shell/REPL to use (default: bash)")
    session_parser.add_argument("-l", "--list", action="store_true", help="List active sessions")
    session_parser.add_argument("--attach", metavar="ID", help="Reconnect to session")
    session_parser.add_argument("--kill", metavar="ID", help="Terminate session")
    session_parser.add_argument("--snapshot", metavar="SESSION_ID", help="Create snapshot of session")
    session_parser.add_argument("--restore", metavar="SNAPSHOT_ID", help="Restore from snapshot ID")
    session_parser.add_argument("--snapshot-name", metavar="NAME", help="Name for snapshot")
    session_parser.add_argument("--hot", action="store_true", help="Hot snapshot (no freeze)")
    session_parser.add_argument("--audit", action="store_true", help="Record session")
    session_parser.add_argument("--tmux", action="store_true", help="Enable tmux persistence")
    session_parser.add_argument("--screen", action="store_true", help="Enable screen persistence")
    session_parser.add_argument("-f", "--files", action="append", metavar="FILE", help="Add input file")
    session_parser.add_argument("-n", "--network", choices=["zerotrust", "semitrusted"])
    session_parser.add_argument("-v", "--vcpu", type=int, choices=range(1, 9))
    session_parser.add_argument("-k", "--api-key")

    # Service subcommand
    service_parser = subparsers.add_parser("service", help="Persistent services")
    # For env subcommand: un.py service env <action> <id>
    service_parser.add_argument("subcommand", nargs="?", help="Subcommand (env)")
    service_parser.add_argument("env_action", nargs="?", help="Env vault action (status, set, export, delete)")
    service_parser.add_argument("env_target", nargs="?", help="Service ID for env command")
    service_parser.add_argument("--name", help="Service name")
    service_parser.add_argument("--ports", help="Comma-separated ports")
    service_parser.add_argument("--domains", help="Comma-separated custom domains")
    service_parser.add_argument("--type", dest="service_type", help="Service type for SRV records (minecraft, mumble, teamspeak, source, tcp, udp)")
    service_parser.add_argument("--bootstrap", help="Bootstrap command or URI")
    service_parser.add_argument("--bootstrap-file", dest="bootstrap_file", help="Upload local file as bootstrap script")
    service_parser.add_argument("-f", "--files", action="append", metavar="FILE", help="Add input file")
    service_parser.add_argument("-e", "--env", action="append", metavar="KEY=VALUE", help="Set environment variable (stored in vault)")
    service_parser.add_argument("--env-file", dest="env_file", metavar="FILE", help="Load env vars from .env file")
    service_parser.add_argument("-l", "--list", action="store_true", help="List services")
    service_parser.add_argument("--info", metavar="ID", help="Get service details")
    service_parser.add_argument("--tail", metavar="ID", help="Get last 9000 lines of logs")
    service_parser.add_argument("--logs", metavar="ID", help="Get all logs")
    service_parser.add_argument("--freeze", "--sleep", dest="sleep", metavar="ID", help="Freeze service")
    service_parser.add_argument("--unfreeze", "--wake", dest="wake", metavar="ID", help="Unfreeze service")
    service_parser.add_argument("--destroy", metavar="ID", help="Destroy service")
    service_parser.add_argument("--resize", metavar="ID", help="Resize service vCPU/memory")
    service_parser.add_argument("--snapshot", metavar="SERVICE_ID", help="Create snapshot of service")
    service_parser.add_argument("--restore", metavar="SNAPSHOT_ID", help="Restore from snapshot ID")
    service_parser.add_argument("--snapshot-name", metavar="NAME", help="Name for snapshot")
    service_parser.add_argument("--hot", action="store_true", help="Hot snapshot (no freeze)")
    service_parser.add_argument("--execute", metavar="ID", help="Execute command in service")
    service_parser.add_argument("--command", help="Command to execute (with --execute)")
    service_parser.add_argument("--dump-bootstrap", metavar="ID", help="Dump bootstrap script")
    service_parser.add_argument("--dump-file", metavar="FILE", help="File to save bootstrap (with --dump-bootstrap)")
    service_parser.add_argument("-n", "--network", choices=["zerotrust", "semitrusted"])
    service_parser.add_argument("-v", "--vcpu", type=int, choices=range(1, 9))
    service_parser.add_argument("-k", "--api-key")

    # Snapshot subcommand
    snapshot_parser = subparsers.add_parser("snapshot", help="Manage container snapshots")
    snapshot_parser.add_argument("-l", "--list", action="store_true", help="List all snapshots")
    snapshot_parser.add_argument("--info", metavar="ID", help="Get snapshot details")
    snapshot_parser.add_argument("--delete", metavar="ID", help="Delete a snapshot")
    snapshot_parser.add_argument("--clone", metavar="ID", help="Clone snapshot to new session/service")
    snapshot_parser.add_argument("--type", help="Type for clone (session or service)")
    snapshot_parser.add_argument("--name", help="Name for cloned session/service")
    snapshot_parser.add_argument("--shell", help="Shell for cloned session")
    snapshot_parser.add_argument("--ports", help="Ports for cloned service")
    snapshot_parser.add_argument("-k", "--api-key")

    # Execute options (default command)
    parser.add_argument("source_file", nargs="?", help="Source file to execute")
    parser.add_argument("-s", "--shell", dest="exec_shell", metavar="LANG", help="Execute inline code with specified language (defaults to bash if arg is not a file)")
    parser.add_argument("-e", "--env", action="append", metavar="KEY=VALUE", help="Set environment variable")
    parser.add_argument("-f", "--files", action="append", metavar="FILE", help="Add input file")
    parser.add_argument("-a", "--artifacts", action="store_true", help="Return artifacts")
    parser.add_argument("-o", "--output-dir", help="Output directory for artifacts")
    parser.add_argument("-y", "--yes", action="store_true", help="Skip confirmations")

    args = parser.parse_args()

    if args.command == "key":
        cmd_key(args)
    elif args.command == "session":
        cmd_session(args)
    elif args.command == "service":
        cmd_service(args)
    elif args.command == "snapshot":
        cmd_snapshot(args)
    elif args.source_file:
        cmd_execute(args)
    else:
        parser.print_help()
        sys.exit(1)


if __name__ == "__main__":
    main()

License

PUBLIC DOMAIN - NO LICENSE, NO WARRANTY

This is free public domain software for the public good of a permacomputer hosted
at permacomputer.com - an always-on computer by the people, for the people. One
that is durable, easy to repair, and distributed like tap water for machine
learning intelligence.

The permacomputer is community-owned infrastructure optimized around four values:

  TRUTH    - First principles, math & science, open source code freely distributed
  FREEDOM  - Voluntary partnerships, freedom from tyranny & corporate control
  HARMONY  - Minimal waste, self-renewing systems with diverse thriving connections
  LOVE     - Be yourself without hurting others, cooperation through natural law

This software contributes to that vision by enabling code execution across all 42
programming languages through a unified interface, accessible to everyone. Code is
seeds to sprout on any abandoned technology.

Learn more: https://www.permacomputer.com

Anyone is free to copy, modify, publish, use, compile, sell, or distribute this
software, either in source code form or as a compiled binary, for any purpose,
commercial or non-commercial, and by any means.

NO WARRANTY. THE SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.

That said, our permacomputer's digital membrane stratum continuously runs unit,
integration, and functional tests on all its own software - with our permacomputer
monitoring itself, repairing itself, with minimal human guidance in the loop.
Our agents do their best.

Copyright 2025 TimeHexOn & foxhop & russell@unturf
https://www.timehexon.com
https://www.foxhop.net
https://www.unturf.com/software