"""Passbolt CLI integration for secure credential management.""" import subprocess import json import os from dataclasses import dataclass from typing import Optional, List, Dict, Any from enum import Enum from pathlib import Path class PassboltResourceType(Enum): """Types of resources in Passbolt.""" PASSWORD = "password" PASSWORD_WITH_DESCRIPTION = "password-with-description" PASSWORD_STRING = "password-string" TOTP = "totp" class PassboltError(Exception): """Exception raised for Passbolt operations.""" pass @dataclass class PassboltCredential: """Represents credentials retrieved from Passbolt.""" resource_id: str name: str username: Optional[str] = None password: Optional[str] = None uri: Optional[str] = None description: Optional[str] = None resource_type: PassboltResourceType = PassboltResourceType.PASSWORD @dataclass class PassboltResource: """Represents a Passbolt resource.""" id: str name: str username: Optional[str] = None uri: Optional[str] = None resource_type: str = "password" folder_parent_id: Optional[str] = None personal: bool = False class PassboltClient: """Client for interacting with Passbolt through the CLI.""" def __init__(self, passbolt_cli_path: str = "passbolt"): """Initialize Passbolt client. Args: passbolt_cli_path: Path to the passbolt CLI executable """ self.cli_path = passbolt_cli_path self._check_cli_available() self._check_authentication() def _check_cli_available(self) -> None: """Check if Passbolt CLI is available.""" try: subprocess.run([self.cli_path, '--version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): raise PassboltError( f"Passbolt CLI not found at '{self.cli_path}'. " "Please install: https://github.com/passbolt/go-passbolt-cli" ) def _check_authentication(self) -> None: """Check if authenticated with Passbolt.""" try: # Try to list resources to check auth self._run_passbolt(['list', '--json'], check=True) except PassboltError: raise PassboltError( "Not authenticated with Passbolt. " "Please run: passbolt auth login" ) def _run_passbolt(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess: """Run a Passbolt CLI command. Args: args: Command arguments check: Whether to check return code Returns: Completed process result """ try: result = subprocess.run( [self.cli_path] + args, capture_output=True, text=True, check=check ) return result except subprocess.CalledProcessError as e: raise PassboltError(f"Passbolt command failed: {e.stderr}") def get_credential(self, resource_id: str) -> PassboltCredential: """Get a credential by resource ID. Args: resource_id: UUID of the Passbolt resource Returns: PassboltCredential object with username and password """ # Get the full resource result = self._run_passbolt(['get', '--id', resource_id, '--json']) try: data = json.loads(result.stdout) except json.JSONDecodeError: raise PassboltError(f"Failed to parse Passbolt response") # Extract fields based on resource type credential = PassboltCredential( resource_id=resource_id, name=data.get('name', ''), username=data.get('username'), password=data.get('password'), uri=data.get('uri'), description=data.get('description') ) # Determine resource type if 'resource_type' in data: try: credential.resource_type = PassboltResourceType( data['resource_type']) except ValueError: pass # Keep default return credential def get_field(self, resource_id: str, field: str) -> str: """Get a specific field from a resource. Args: resource_id: UUID of the Passbolt resource field: Field name (e.g., 'password', 'username', 'uri') Returns: Field value as string """ result = self._run_passbolt( ['get', '--id', resource_id, '--field', field]) return result.stdout.strip() def get_password(self, resource_id: str) -> str: """Get just the password for a resource. Args: resource_id: UUID of the Passbolt resource Returns: Password string """ return self.get_field(resource_id, 'password') def get_username(self, resource_id: str) -> str: """Get just the username for a resource. Args: resource_id: UUID of the Passbolt resource Returns: Username string """ return self.get_field(resource_id, 'username') def list_resources(self, folder_id: Optional[str] = None, search: Optional[str] = None) -> List[PassboltResource]: """List available resources. Args: folder_id: Optional folder ID to filter by search: Optional search term Returns: List of PassboltResource objects """ args = ['list', '--json'] if folder_id: args.extend(['--folder', folder_id]) if search: args.extend(['--filter', search]) result = self._run_passbolt(args) try: data = json.loads(result.stdout) except json.JSONDecodeError: return [] resources = [] for item in data: resources.append(PassboltResource( id=item['id'], name=item.get('name', ''), username=item.get('username'), uri=item.get('uri'), resource_type=item.get('resource_type', 'password'), folder_parent_id=item.get('folder_parent_id'), personal=item.get('personal', False) )) return resources def find_resource_by_name(self, name: str) -> Optional[PassboltResource]: """Find a resource by name. Args: name: Name of the resource to find Returns: First matching PassboltResource or None """ resources = self.list_resources(search=name) for resource in resources: if resource.name == name: return resource return None def create_resource(self, name: str, username: str, password: str, uri: Optional[str] = None, description: Optional[str] = None, folder_id: Optional[str] = None) -> str: """Create a new password resource. Args: name: Resource name username: Username password: Password uri: Optional URI/URL description: Optional description folder_id: Optional folder to place resource in Returns: ID of created resource """ args = ['create', 'resource', '--name', name, '--username', username, '--password', password] if uri: args.extend(['--uri', uri]) if description: args.extend(['--description', description]) if folder_id: args.extend(['--folder', folder_id]) result = self._run_passbolt(args) # Parse the ID from output # Output format: "Resource created: " for line in result.stdout.split('\n'): if 'created' in line.lower() and ':' in line: parts = line.split(':', 1) if len(parts) == 2: return parts[1].strip() raise PassboltError("Failed to parse created resource ID") def update_resource(self, resource_id: str, name: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, uri: Optional[str] = None, description: Optional[str] = None) -> None: """Update an existing resource. Args: resource_id: ID of resource to update name: New name (optional) username: New username (optional) password: New password (optional) uri: New URI (optional) description: New description (optional) """ args = ['update', 'resource', '--id', resource_id] if name: args.extend(['--name', name]) if username: args.extend(['--username', username]) if password: args.extend(['--password', password]) if uri: args.extend(['--uri', uri]) if description: args.extend(['--description', description]) self._run_passbolt(args) def delete_resource(self, resource_id: str) -> None: """Delete a resource. Args: resource_id: ID of resource to delete """ self._run_passbolt(['delete', 'resource', '--id', resource_id]) def share_resource(self, resource_id: str, user_id: str, permission: str = "read") -> None: """Share a resource with another user. Args: resource_id: ID of resource to share user_id: ID of user to share with permission: Permission level ('read', 'update', 'owner') """ self._run_passbolt([ 'share', 'resource', '--id', resource_id, '--user', user_id, '--permission', permission ]) def list_folders(self) -> List[Dict[str, Any]]: """List all folders. Returns: List of folder dictionaries """ result = self._run_passbolt(['list', 'folder', '--json']) try: return json.loads(result.stdout) except json.JSONDecodeError: return [] def get_folder_by_name(self, name: str) -> Optional[Dict[str, Any]]: """Find a folder by name. Args: name: Folder name to search for Returns: Folder dictionary or None """ folders = self.list_folders() for folder in folders: if folder.get('name') == name: return folder return None def validate_resource_id(self, resource_id: str) -> bool: """Check if a resource ID exists and is accessible. Args: resource_id: UUID of the resource Returns: True if resource exists and is accessible """ try: self._run_passbolt(['get', '--id', resource_id, '--field', 'name']) return True except PassboltError: return False