diff --git a/CLAUDE.md b/CLAUDE.md index b26b025..25ce966 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -32,129 +32,229 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co **main.py** - Main GUI application entry point - `VPNManagerWindow` class: Primary PyGObject/GTK3-based GUI application -- Implements a two-column layout: active customers (left) vs inactive customers (right) +- Implements single-view layout with Gtk.Stack for smooth transitions - Features system tray integration using `pystray` - Uses GNOME-style theming with CSS styling for cards -- Includes search functionality across customers, locations, and hosts +- Includes advanced search functionality with wildcard support (`*`) - HeaderBar for native GNOME look and feel +- Current location tracking and display -**models.py** - Data model definitions using dataclasses and enums +**models.py** - Type-safe data model definitions using dataclasses and enums - `ServiceType`: Enum for service types (SSH, Web GUI, RDP, VNC, SMB, Database, FTP) - `HostType`: Enum for host types (Linux, Windows, Windows Server, Proxmox, ESXi, Router, Switch) - `VPNType`: Enum for VPN types (OpenVPN, WireGuard, IPSec) -- `Service`: Individual services on hosts with type-safe enums -- `Host`: Physical/virtual machines with services and sub-hosts (VMs) +- `Service`: Individual services on hosts with type-safe enums and port numbers +- `Host`: Physical/virtual machines with services and recursive sub-hosts (VMs) - `Location`: Customer locations with VPN configurations and host infrastructure - `CustomerService`: Customer's cloud/web services (O365, CRM, etc.) - `Customer`: Top-level entities containing services and locations -- Each model includes helper methods for common operations +- Each model includes comprehensive helper methods for common operations **data_loader.py** - YAML-based data management layer - `load_customers()`: Loads customer configurations from `~/.vpntray/customers/*.yaml` files -- `save_customer()`: Saves customer data back to YAML files +- `save_customer()`: Saves customer data back to YAML files with proper enum serialization - `initialize_example_customers()`: Creates example configuration files -- Robust parsing with enum conversion and error handling +- Robust parsing with enum conversion, error handling, and graceful fallbacks - Falls back to demo data if no configuration files exist +- Supports both `.yaml` and `.yml` file extensions + +**views/** - High-level UI view management (MVC pattern) +- `active_view.py`: `ActiveView` class for displaying active locations with full interaction +- `inactive_view.py`: `InactiveView` class for search results (inactive locations) +- Clean separation of concerns with dedicated view controllers +- `__init__.py`: View exports for clean imports **widgets/** - Modular UI components using PyGObject - `customer_card.py`: `ActiveCustomerCard` and `InactiveCustomerCard` classes + - Active cards: Interactive buttons for customer services and full location details + - Inactive cards: Read-only service lists and location activation buttons - `location_card.py`: `ActiveLocationCard` and `InactiveLocationCard` classes + - Active cards: Connection controls, deactivation (X button), and infrastructure details + - Inactive cards: Current location setting and activation buttons - `host_item.py`: `HostItem` class for displaying hosts and their services + - Hierarchical display supporting hypervisors with sub-hosts (VMs) + - Service buttons for direct access to SSH, Web GUI, RDP services - `__init__.py`: Widget exports for clean imports -**views/** - High-level UI view management -- `active_view.py`: `ActiveView` class for displaying active locations -- `inactive_view.py`: `InactiveView` class for search results (inactive locations) -- `__init__.py`: View exports for clean imports - **Configuration Files** -- `init_config.py`: Helper script to initialize user configuration -- `example_customer.yaml`: Complete example showing YAML schema +- `init_config.py`: Helper script to initialize user configuration with examples +- `example_customer.yaml`: Complete example showing YAML schema with all features - User config: `~/.vpntray/customers/*.yaml` - One file per customer ### Key Architecture Patterns **Hierarchical Data Structure**: Customer → Location → Host → Service -- Customers have cloud services (accessible anywhere) and multiple locations +- Customers have cloud services (accessible anywhere) and multiple physical locations - Each location has VPN configuration, connection state, and host infrastructure -- Hosts can have sub-hosts (VMs under hypervisors) and multiple services -- Services represent endpoints (SSH, Web GUI, RDP, etc.) that can be launched +- Hosts can have sub-hosts (VMs under hypervisors) with recursive nesting +- Services represent endpoints (SSH, Web GUI, RDP, etc.) with specific ports -**Active/Inactive Location Management**: -- Locations (not customers) are activated/deactivated individually -- Left column shows customers with active locations (full detail view) -- Right column shows customers with inactive locations (summary cards) -- UI automatically reorganizes based on location activation state +**Location State Management**: +- **Active/Inactive**: Locations can be activated for VPN management +- **Current Location**: User's physical location (separate from VPN connections) +- **Connection State**: VPN connection status independent of location activation +- Automatic UI updates based on state changes with immediate feedback -**Widget-Based UI Architecture**: -- Modular widget classes handle their own GTK widget creation +**Single-View UI Architecture with Stack Navigation**: +- Uses `Gtk.Stack` for smooth view transitions with crossfade animation +- **Normal mode**: Shows only active locations (full detail view) +- **Search mode**: Shows only inactive locations (activation and current location setting) +- Clean visual separation with no overlapping or confusing dual-column layouts + +**Widget-Based Component System**: +- Modular widget classes handle their own GTK widget creation and event handling - Callback system for widget-to-main-window communication -- Clean separation between data models and UI representation +- Clear separation between data models and UI representation +- Different widget behavior for active vs inactive states -**Mock Implementation**: -- Currently a UI mockup with no actual VPN functionality -- All VPN operations (connect, disconnect, routes) are placeholder methods -- Button actions update UI state but don't perform real network operations -- Rich mock data includes hypervisors with VMs, various service types +**Type-Safe Configuration System**: +- Comprehensive enum usage for all categorical data +- YAML-based configuration with robust parsing and validation +- Graceful error handling and fallback mechanisms +- Easy extensibility for new service types, host types, and VPN protocols + +### Current Location Tracking + +The application tracks two distinct location concepts: +- **Current Location**: Where the user physically is (set via "Set as Current" button) +- **Active Locations**: Locations available for VPN connections +- Current location is displayed prominently above the search bar +- Users can set current location from inactive location cards without activating VPN + +### Search and Discovery Features + +**Advanced Search Functionality**: +- Real-time search across customers, locations, hosts, and services +- **Wildcard support**: Type `*` to show all inactive locations +- **Smart filtering**: Search includes IP addresses, service types, and port numbers +- **Context switching**: Empty search shows active view, any text shows search results +- **Auto-clear**: Activating a location automatically clears search and returns to active view ### Data Flow -1. `data_loader.load_customers()` loads customer configurations from YAML files in `~/.vpntray/customers/` -2. Main window loads and filters data based on search terms (including `*` wildcard for all inactive) -3. View classes (`ActiveView`/`InactiveView`) manage display using widget components -4. User interactions trigger callbacks that update dataclass attributes -5. Changes can be persisted back to YAML files using `save_customer()` -6. UI re-renders to reflect state changes with smooth transitions via Gtk.Stack + +1. **Initialization**: `data_loader.load_customers()` loads configurations from YAML files in `~/.vpntray/customers/` +2. **UI Setup**: Creates views and widgets using callback architecture +3. **Search/Filter**: Real-time filtering with wildcard support and smart matching +4. **View Management**: `Gtk.Stack` manages smooth transitions between active/inactive views +5. **User Interactions**: Callbacks update dataclass attributes with immediate UI feedback +6. **Persistence**: Changes can be saved back to YAML files using `save_customer()` +7. **State Updates**: Location activation, connection changes, and current location updates ### UI Layout Structure + +**Modern Single-View Design**: - HeaderBar with title and subtitle (GNOME HIG compliance) -- Search entry with placeholder text for filtering (supports `*` wildcard) +- Current location display (centered, prominent) +- Search entry with comprehensive placeholder text (supports `*` wildcard) - Single-view layout using Gtk.Stack for smooth transitions -- **Normal mode**: Shows only active locations (full detail view) -- **Search mode**: Shows only inactive locations matching search term (activation cards) -- GNOME-style cards with CSS theming and proper spacing +- **Normal mode**: Active locations with full interaction (connections, services, infrastructure) +- **Search mode**: Inactive locations with activation and current location setting +- GNOME-style cards with CSS theming, proper spacing, and visual hierarchy - System tray integration for minimize-to-tray behavior ### GTK3/PyGObject Specific Features -- CSS styling for GNOME-style cards with borders, shadows, and theming -- Native GTK widgets: HeaderBar, SearchEntry, ScrolledWindow, Stack -- Smooth view transitions using Gtk.Stack with crossfade animation -- Proper GNOME HIG compliance for spacing, margins, and layout -- Button styling with suggested-action and destructive-action classes -- Thread-safe system tray integration using GLib.idle_add -### Future Extensibility -- Implement actual VPN connection logic in placeholder methods -- Add real-time VPN status monitoring and automatic reconnection -- Extend YAML schema for additional VPN configuration options -- Add import/export functionality for customer configurations -- Implement configuration validation and error reporting -- Add support for additional VPN clients and protocols -- Extend widget system for additional UI components (settings, logs, etc.) +- **CSS styling**: GNOME-style cards with borders, shadows, and adaptive theming +- **Native widgets**: HeaderBar, SearchEntry, ScrolledWindow, Stack with crossfade transitions +- **Proper GNOME HIG compliance**: Spacing, margins, typography, and layout +- **Button styling**: suggested-action and destructive-action classes for clear visual hierarchy +- **Thread-safe integration**: System tray using GLib.idle_add for thread safety +- **Responsive design**: Proper scrolling and adaptive layouts + +### Mock Implementation Status + +- Currently a UI mockup with no actual VPN functionality +- All VPN operations (connect, disconnect, routes) are placeholder methods +- Button actions update UI state but don't perform real network operations +- Rich configuration system ready for real VPN client integration +- Comprehensive infrastructure modeling supports complex network topologies ### YAML Configuration Schema + Customer files in `~/.vpntray/customers/` follow this structure: + ```yaml name: Customer Name + +# Cloud/web services (always accessible) services: - name: Service Name url: https://service.url service_type: Service Category description: Optional description + +# Physical locations with VPN and infrastructure locations: - name: Location Name vpn_type: OpenVPN|WireGuard|IPSec vpn_config: /path/to/config/file - active: true|false - connected: true|false + active: true|false # Available for VPN management + connected: true|false # Current VPN connection status + hosts: - name: Host Name ip_address: IP Address - host_type: Linux|Windows|etc + host_type: Linux|Windows|Windows Server|Proxmox|ESXi|Router|Switch description: Optional description + services: - name: Service Name - service_type: SSH|Web GUI|RDP|etc + service_type: SSH|Web GUI|RDP|VNC|SMB|Database|FTP port: Port Number - sub_hosts: # Optional VMs/containers - - # Same structure as hosts -``` \ No newline at end of file + + sub_hosts: # Optional VMs/containers (recursive structure) + - name: VM Name + ip_address: VM IP + host_type: Linux|Windows|Windows Server + services: # Same structure as parent host + - name: Service Name + service_type: SSH|Web GUI|RDP + port: Port Number +``` + +### Future Extensibility + +**VPN Integration Ready**: +- Implement actual VPN connection logic in placeholder methods +- Add real-time VPN status monitoring and automatic reconnection +- Support for multiple VPN clients (OpenVPN, WireGuard, IPSec) +- Integration with system network management + +**Configuration Management**: +- Import/export functionality for customer configurations +- Configuration validation with detailed error reporting +- Backup and restore capabilities +- Multi-user configuration support + +**UI Enhancements**: +- Settings panel for application preferences +- Connection logs and monitoring +- Network diagnostics and troubleshooting tools +- Custom themes and layout preferences +- Notification system for connection status changes + +**Advanced Features**: +- Auto-discovery of network services and hosts +- Integration with network monitoring tools +- SSH key management and authentication +- Bookmark system for frequently accessed services +- Connection history and analytics + +## Development Notes + +**Code Quality**: +- Type hints throughout codebase for better IDE support +- Comprehensive error handling with graceful degradation +- Clean separation of concerns (MVC pattern) +- Extensive use of dataclasses and enums for type safety + +**Testing Strategy**: +- Configuration can be tested with example YAML files +- UI components are modular and independently testable +- Mock data system allows UI testing without VPN dependencies + +**Performance Considerations**: +- Efficient YAML loading with caching +- Minimal UI updates with smart re-rendering +- Background thread for system tray to prevent UI blocking +- Lazy loading of complex widget hierarchies \ No newline at end of file diff --git a/data_loader.py b/data_loader.py index bc48ee6..a1ab8ad 100644 --- a/data_loader.py +++ b/data_loader.py @@ -100,14 +100,17 @@ def parse_location(location_data: Dict[str, Any]) -> Location: host = parse_host(host_data) hosts.append(host) - # Create location + # Create location (active and connected default to False - runtime state) location = Location( name=location_data['name'], vpn_type=parse_vpn_type(location_data['vpn_type']), - connected=location_data.get('connected', False), - active=location_data.get('active', False), + connected=False, # Runtime state - always starts disconnected + active=False, # Runtime state - always starts inactive vpn_config=location_data.get('vpn_config', ''), - hosts=hosts + hosts=hosts, + vpn_credentials=location_data.get('vpn_credentials'), + nmcli_connection_name=location_data.get('nmcli_connection_name'), + auto_import=location_data.get('auto_import', True) ) return location diff --git a/example_customer.yaml b/example_customer.yaml index fd908ae..60b2d30 100644 --- a/example_customer.yaml +++ b/example_customer.yaml @@ -23,8 +23,18 @@ locations: - name: Main Office vpn_type: OpenVPN vpn_config: /etc/openvpn/techcorp-main.ovpn - active: true - connected: true + + # VPN credentials - three options: + # Option 1: Dictionary with username/password + vpn_credentials: + username: vpnuser + password: securepass123 + + # Option 2: Passbolt UUID (for future implementation when CLI is updated) + # vpn_credentials: "550e8400-e29b-41d4-a716-446655440000" + + # Option 3: Omit or set to null if no credentials needed + # vpn_credentials: null # Hosts at this location hosts: @@ -111,8 +121,9 @@ locations: - name: Branch Office vpn_type: WireGuard vpn_config: /etc/wireguard/techcorp-branch.conf - active: false - connected: false + + # No credentials needed for WireGuard (uses keys in config file) + vpn_credentials: null hosts: - name: BRANCH-01 diff --git a/models.py b/models.py index 7b847ea..8eed863 100644 --- a/models.py +++ b/models.py @@ -72,6 +72,16 @@ class Location: active: bool = False vpn_config: str = "" # Path to VPN config or connection details hosts: List[Host] = field(default_factory=list) + + # VPN connection management fields + nmcli_connection_name: Optional[str] = None # NetworkManager connection name + auto_import: bool = True # Auto-import .ovpn file if not in NetworkManager + + # Credential storage - can be: + # - Passbolt UUID string (for future use) + # - Dict with 'username' and 'password' keys + # - None if no credentials needed + vpn_credentials: Optional[dict | str] = None def get_host_by_name(self, host_name: str) -> Optional[Host]: """Get a host by its name (searches recursively in sub-hosts).""" diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..003ea75 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,17 @@ +"""Services package for VPN and password management.""" + +from .vpn_manager import VPNManager, VPNConnectionError, VPNStatus, VPNConnection +from .passbolt_client import PassboltClient, PassboltError, PassboltCredential +from .connection_manager import ConnectionManager, ConnectionConfig + +__all__ = [ + 'VPNManager', + 'VPNConnection', + 'VPNConnectionError', + 'VPNStatus', + 'PassboltClient', + 'PassboltCredential', + 'PassboltError', + 'ConnectionManager', + 'ConnectionConfig', +] \ No newline at end of file diff --git a/services/connection_manager.py b/services/connection_manager.py new file mode 100644 index 0000000..016e2de --- /dev/null +++ b/services/connection_manager.py @@ -0,0 +1,266 @@ +"""High-level connection manager that integrates VPN and Passbolt.""" + +import logging +from typing import Optional, Dict, Any +from pathlib import Path +from dataclasses import dataclass + +from .vpn_manager import VPNManager, VPNStatus, VPNConnectionError +from .passbolt_client import PassboltClient, PassboltError + + +logger = logging.getLogger(__name__) + + +@dataclass +class ConnectionConfig: + """Configuration for a VPN connection.""" + name: str + vpn_config_path: str + nmcli_connection_name: Optional[str] = None + auto_import: bool = True # Auto-import .ovpn if not in NetworkManager + # Credentials can be: + # - Passbolt UUID string (for future implementation) + # - Dict with 'username' and 'password' keys + # - None if no credentials needed + vpn_credentials: Optional[dict | str] = None + + +class ConnectionManager: + """Manages VPN connections with Passbolt credential integration.""" + + def __init__(self, use_passbolt: bool = True): + """Initialize the connection manager. + + Args: + use_passbolt: Whether to use Passbolt for credentials + """ + self.vpn_manager = VPNManager() + self.passbolt_client = None + + if use_passbolt: + try: + self.passbolt_client = PassboltClient() + logger.info("Passbolt client initialized successfully") + except PassboltError as e: + logger.warning(f"Passbolt not available: {e}") + logger.info("Falling back to manual credential entry") + + def connect_location(self, config: ConnectionConfig, + username: Optional[str] = None, + password: Optional[str] = None) -> None: + """Connect to a VPN location. + + Args: + config: Connection configuration + username: Override username (if not using Passbolt) + password: Override password (if not using Passbolt) + """ + # Ensure connection exists in NetworkManager + connection_name = self._ensure_connection(config) + + # Get credentials - check overrides first, then config + if not username or not password: + creds_username, creds_password = self._get_credentials_from_config(config) + username = username or creds_username + password = password or creds_password + + if not username or not password: + logger.info(f"No credentials provided for {connection_name}") + # nmcli will prompt for credentials + + # Connect + try: + logger.info(f"Connecting to {connection_name}") + self.vpn_manager.connect(connection_name, username, password) + logger.info(f"Successfully connected to {connection_name}") + except VPNConnectionError as e: + logger.error(f"Failed to connect to {connection_name}: {e}") + raise + + def disconnect_location(self, config: ConnectionConfig) -> None: + """Disconnect from a VPN location. + + Args: + config: Connection configuration + """ + connection_name = config.nmcli_connection_name or config.name + + if not self.vpn_manager.connection_exists(connection_name): + logger.warning(f"Connection {connection_name} does not exist") + return + + try: + logger.info(f"Disconnecting from {connection_name}") + self.vpn_manager.disconnect(connection_name) + logger.info(f"Successfully disconnected from {connection_name}") + except VPNConnectionError as e: + logger.error(f"Failed to disconnect from {connection_name}: {e}") + raise + + def get_connection_status(self, config: ConnectionConfig) -> VPNStatus: + """Get the status of a VPN connection. + + Args: + config: Connection configuration + + Returns: + Current VPN status + """ + connection_name = config.nmcli_connection_name or config.name + + if not self.vpn_manager.connection_exists(connection_name): + return VPNStatus.DISCONNECTED + + return self.vpn_manager.get_status(connection_name) + + def _ensure_connection(self, config: ConnectionConfig) -> str: + """Ensure VPN connection exists in NetworkManager. + + Args: + config: Connection configuration + + Returns: + Name of the NetworkManager connection + """ + connection_name = config.nmcli_connection_name or config.name + + # Check if connection already exists + if self.vpn_manager.connection_exists(connection_name): + logger.debug(f"Connection {connection_name} already exists") + return connection_name + + # Import if auto_import is enabled and config file exists + if config.auto_import and config.vpn_config_path: + vpn_file = Path(config.vpn_config_path) + if vpn_file.exists(): + logger.info(f"Importing VPN configuration from {vpn_file}") + imported_name = self.vpn_manager.import_ovpn( + str(vpn_file), + connection_name + ) + logger.info(f"Imported connection as {imported_name}") + return imported_name + else: + raise VPNConnectionError( + f"VPN config file not found: {config.vpn_config_path}" + ) + + raise VPNConnectionError( + f"Connection {connection_name} does not exist and auto-import is disabled" + ) + + def _get_credentials_from_config(self, config: ConnectionConfig) -> tuple[Optional[str], Optional[str]]: + """Get credentials from the configuration. + + Args: + config: Connection configuration + + Returns: + Tuple of (username, password) or (None, None) + """ + if not config.vpn_credentials: + return None, None + + # If it's a dict with username/password + if isinstance(config.vpn_credentials, dict): + username = config.vpn_credentials.get('username') + password = config.vpn_credentials.get('password') + return username, password + + # If it's a string (Passbolt UUID for future use) + if isinstance(config.vpn_credentials, str): + # For now, try to use Passbolt if available + if self.passbolt_client: + try: + return self._get_passbolt_credentials(config.vpn_credentials) + except (PassboltError, ValueError) as e: + logger.warning(f"Failed to get Passbolt credentials: {e}") + else: + logger.warning(f"Passbolt UUID provided but Passbolt client not available") + + return None, None + + def _get_passbolt_credentials(self, resource_id: str) -> tuple[str, str]: + """Get credentials from Passbolt. + + Args: + resource_id: Passbolt resource UUID + + Returns: + Tuple of (username, password) + """ + if not self.passbolt_client: + raise ValueError("Passbolt client not initialized") + + try: + credential = self.passbolt_client.get_credential(resource_id) + + if not credential.username or not credential.password: + raise ValueError( + f"Incomplete credentials for resource {resource_id}") + + return credential.username, credential.password + + except PassboltError as e: + logger.error(f"Failed to get Passbolt credentials: {e}") + raise + + def validate_passbolt_resource(self, resource_id: str) -> bool: + """Validate that a Passbolt resource exists and has required fields. + + Args: + resource_id: Passbolt resource UUID + + Returns: + True if resource is valid for VPN use + """ + if not self.passbolt_client: + return False + + try: + credential = self.passbolt_client.get_credential(resource_id) + return bool(credential.username and credential.password) + except PassboltError: + return False + + def import_all_configs(self, configs: list[ConnectionConfig]) -> Dict[str, bool]: + """Import multiple VPN configurations. + + Args: + configs: List of connection configurations + + Returns: + Dictionary mapping connection names to success status + """ + results = {} + + for config in configs: + try: + connection_name = self._ensure_connection(config) + results[connection_name] = True + logger.info(f"Successfully imported {connection_name}") + except VPNConnectionError as e: + results[config.name] = False + logger.error(f"Failed to import {config.name}: {e}") + + return results + + def cleanup_connection(self, config: ConnectionConfig, + remove_from_nm: bool = False) -> None: + """Clean up a VPN connection. + + Args: + config: Connection configuration + remove_from_nm: Whether to remove from NetworkManager + """ + connection_name = config.nmcli_connection_name or config.name + + # Disconnect if connected + if self.get_connection_status(config) == VPNStatus.CONNECTED: + self.disconnect_location(config) + + # Remove from NetworkManager if requested + if remove_from_nm and self.vpn_manager.connection_exists(connection_name): + logger.info(f"Removing {connection_name} from NetworkManager") + self.vpn_manager.delete_connection(connection_name) diff --git a/services/passbolt_client.py b/services/passbolt_client.py new file mode 100644 index 0000000..3b948b4 --- /dev/null +++ b/services/passbolt_client.py @@ -0,0 +1,369 @@ +"""Passbolt CLI integration for secure credential management.""" + +import subprocess +import json +import os +from dataclasses import dataclass +from typing import Optional, List, Dict, Any +from enum import Enum +from pathlib import Path + + +class PassboltResourceType(Enum): + """Types of resources in Passbolt.""" + PASSWORD = "password" + PASSWORD_WITH_DESCRIPTION = "password-with-description" + PASSWORD_STRING = "password-string" + TOTP = "totp" + + +class PassboltError(Exception): + """Exception raised for Passbolt operations.""" + pass + + +@dataclass +class PassboltCredential: + """Represents credentials retrieved from Passbolt.""" + resource_id: str + name: str + username: Optional[str] = None + password: Optional[str] = None + uri: Optional[str] = None + description: Optional[str] = None + resource_type: PassboltResourceType = PassboltResourceType.PASSWORD + + +@dataclass +class PassboltResource: + """Represents a Passbolt resource.""" + id: str + name: str + username: Optional[str] = None + uri: Optional[str] = None + resource_type: str = "password" + folder_parent_id: Optional[str] = None + personal: bool = False + + +class PassboltClient: + """Client for interacting with Passbolt through the CLI.""" + + def __init__(self, passbolt_cli_path: str = "passbolt"): + """Initialize Passbolt client. + + Args: + passbolt_cli_path: Path to the passbolt CLI executable + """ + self.cli_path = passbolt_cli_path + self._check_cli_available() + self._check_authentication() + + def _check_cli_available(self) -> None: + """Check if Passbolt CLI is available.""" + try: + subprocess.run([self.cli_path, '--version'], + capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + raise PassboltError( + f"Passbolt CLI not found at '{self.cli_path}'. " + "Please install: https://github.com/passbolt/go-passbolt-cli" + ) + + def _check_authentication(self) -> None: + """Check if authenticated with Passbolt.""" + try: + # Try to list resources to check auth + self._run_passbolt(['list', '--json'], check=True) + except PassboltError: + raise PassboltError( + "Not authenticated with Passbolt. " + "Please run: passbolt auth login" + ) + + def _run_passbolt(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess: + """Run a Passbolt CLI command. + + Args: + args: Command arguments + check: Whether to check return code + + Returns: + Completed process result + """ + try: + result = subprocess.run( + [self.cli_path] + args, + capture_output=True, + text=True, + check=check + ) + return result + except subprocess.CalledProcessError as e: + raise PassboltError(f"Passbolt command failed: {e.stderr}") + + def get_credential(self, resource_id: str) -> PassboltCredential: + """Get a credential by resource ID. + + Args: + resource_id: UUID of the Passbolt resource + + Returns: + PassboltCredential object with username and password + """ + # Get the full resource + result = self._run_passbolt(['get', '--id', resource_id, '--json']) + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + raise PassboltError(f"Failed to parse Passbolt response") + + # Extract fields based on resource type + credential = PassboltCredential( + resource_id=resource_id, + name=data.get('name', ''), + username=data.get('username'), + password=data.get('password'), + uri=data.get('uri'), + description=data.get('description') + ) + + # Determine resource type + if 'resource_type' in data: + try: + credential.resource_type = PassboltResourceType( + data['resource_type']) + except ValueError: + pass # Keep default + + return credential + + def get_field(self, resource_id: str, field: str) -> str: + """Get a specific field from a resource. + + Args: + resource_id: UUID of the Passbolt resource + field: Field name (e.g., 'password', 'username', 'uri') + + Returns: + Field value as string + """ + result = self._run_passbolt( + ['get', '--id', resource_id, '--field', field]) + return result.stdout.strip() + + def get_password(self, resource_id: str) -> str: + """Get just the password for a resource. + + Args: + resource_id: UUID of the Passbolt resource + + Returns: + Password string + """ + return self.get_field(resource_id, 'password') + + def get_username(self, resource_id: str) -> str: + """Get just the username for a resource. + + Args: + resource_id: UUID of the Passbolt resource + + Returns: + Username string + """ + return self.get_field(resource_id, 'username') + + def list_resources(self, folder_id: Optional[str] = None, + search: Optional[str] = None) -> List[PassboltResource]: + """List available resources. + + Args: + folder_id: Optional folder ID to filter by + search: Optional search term + + Returns: + List of PassboltResource objects + """ + args = ['list', '--json'] + + if folder_id: + args.extend(['--folder', folder_id]) + if search: + args.extend(['--filter', search]) + + result = self._run_passbolt(args) + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + return [] + + resources = [] + for item in data: + resources.append(PassboltResource( + id=item['id'], + name=item.get('name', ''), + username=item.get('username'), + uri=item.get('uri'), + resource_type=item.get('resource_type', 'password'), + folder_parent_id=item.get('folder_parent_id'), + personal=item.get('personal', False) + )) + + return resources + + def find_resource_by_name(self, name: str) -> Optional[PassboltResource]: + """Find a resource by name. + + Args: + name: Name of the resource to find + + Returns: + First matching PassboltResource or None + """ + resources = self.list_resources(search=name) + for resource in resources: + if resource.name == name: + return resource + return None + + def create_resource(self, name: str, username: str, password: str, + uri: Optional[str] = None, + description: Optional[str] = None, + folder_id: Optional[str] = None) -> str: + """Create a new password resource. + + Args: + name: Resource name + username: Username + password: Password + uri: Optional URI/URL + description: Optional description + folder_id: Optional folder to place resource in + + Returns: + ID of created resource + """ + args = ['create', 'resource', + '--name', name, + '--username', username, + '--password', password] + + if uri: + args.extend(['--uri', uri]) + if description: + args.extend(['--description', description]) + if folder_id: + args.extend(['--folder', folder_id]) + + result = self._run_passbolt(args) + + # Parse the ID from output + # Output format: "Resource created: " + for line in result.stdout.split('\n'): + if 'created' in line.lower() and ':' in line: + parts = line.split(':', 1) + if len(parts) == 2: + return parts[1].strip() + + raise PassboltError("Failed to parse created resource ID") + + def update_resource(self, resource_id: str, + name: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + uri: Optional[str] = None, + description: Optional[str] = None) -> None: + """Update an existing resource. + + Args: + resource_id: ID of resource to update + name: New name (optional) + username: New username (optional) + password: New password (optional) + uri: New URI (optional) + description: New description (optional) + """ + args = ['update', 'resource', '--id', resource_id] + + if name: + args.extend(['--name', name]) + if username: + args.extend(['--username', username]) + if password: + args.extend(['--password', password]) + if uri: + args.extend(['--uri', uri]) + if description: + args.extend(['--description', description]) + + self._run_passbolt(args) + + def delete_resource(self, resource_id: str) -> None: + """Delete a resource. + + Args: + resource_id: ID of resource to delete + """ + self._run_passbolt(['delete', 'resource', '--id', resource_id]) + + def share_resource(self, resource_id: str, user_id: str, + permission: str = "read") -> None: + """Share a resource with another user. + + Args: + resource_id: ID of resource to share + user_id: ID of user to share with + permission: Permission level ('read', 'update', 'owner') + """ + self._run_passbolt([ + 'share', 'resource', + '--id', resource_id, + '--user', user_id, + '--permission', permission + ]) + + def list_folders(self) -> List[Dict[str, Any]]: + """List all folders. + + Returns: + List of folder dictionaries + """ + result = self._run_passbolt(['list', 'folder', '--json']) + + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return [] + + def get_folder_by_name(self, name: str) -> Optional[Dict[str, Any]]: + """Find a folder by name. + + Args: + name: Folder name to search for + + Returns: + Folder dictionary or None + """ + folders = self.list_folders() + for folder in folders: + if folder.get('name') == name: + return folder + return None + + def validate_resource_id(self, resource_id: str) -> bool: + """Check if a resource ID exists and is accessible. + + Args: + resource_id: UUID of the resource + + Returns: + True if resource exists and is accessible + """ + try: + self._run_passbolt(['get', '--id', resource_id, '--field', 'name']) + return True + except PassboltError: + return False diff --git a/services/vpn_manager.py b/services/vpn_manager.py new file mode 100644 index 0000000..fbd38f8 --- /dev/null +++ b/services/vpn_manager.py @@ -0,0 +1,311 @@ +"""VPN connection management using NetworkManager (nmcli).""" + +import subprocess +import tempfile +import os +import re +from dataclasses import dataclass +from typing import Optional, Dict, List +from enum import Enum +from pathlib import Path + + +class VPNStatus(Enum): + """VPN connection status.""" + CONNECTED = "connected" + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + DISCONNECTING = "disconnecting" + FAILED = "failed" + UNKNOWN = "unknown" + + +class VPNConnectionError(Exception): + """Exception raised for VPN connection errors.""" + pass + + +@dataclass +class VPNConnection: + """Represents a NetworkManager VPN connection.""" + name: str + uuid: str + type: str + device: Optional[str] = None + state: VPNStatus = VPNStatus.UNKNOWN + vpn_type: Optional[str] = None # OpenVPN, WireGuard, etc. + + +class VPNManager: + """Manages VPN connections through NetworkManager CLI (nmcli).""" + + def __init__(self): + """Initialize VPN manager and check for nmcli availability.""" + self._check_nmcli_available() + + def _check_nmcli_available(self) -> None: + """Check if nmcli is available on the system.""" + try: + subprocess.run(['nmcli', '--version'], + capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + raise VPNConnectionError( + "nmcli is not available. Please install NetworkManager.") + + def _run_nmcli(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess: + """Run an nmcli command with error handling.""" + try: + result = subprocess.run( + ['nmcli'] + args, + capture_output=True, + text=True, + check=check + ) + return result + except subprocess.CalledProcessError as e: + raise VPNConnectionError(f"nmcli command failed: {e.stderr}") + + def import_ovpn(self, ovpn_path: str, connection_name: Optional[str] = None) -> str: + """Import an OpenVPN configuration file. + + Args: + ovpn_path: Path to the .ovpn configuration file + connection_name: Optional custom name for the connection + + Returns: + The name of the imported connection + """ + ovpn_file = Path(ovpn_path) + if not ovpn_file.exists(): + raise VPNConnectionError( + f"OpenVPN config file not found: {ovpn_path}") + + # Import the configuration + result = self._run_nmcli([ + 'connection', 'import', 'type', 'openvpn', 'file', str(ovpn_file) + ]) + + # Extract connection name from output + # nmcli typically outputs: "Connection 'name' (uuid) successfully added." + match = re.search(r"Connection '([^']+)'", result.stdout) + if not match: + raise VPNConnectionError( + "Failed to parse imported connection name") + + imported_name = match.group(1) + + # Rename if custom name provided + if connection_name and connection_name != imported_name: + self._run_nmcli([ + 'connection', 'modify', imported_name, + 'connection.id', connection_name + ]) + return connection_name + + return imported_name + + def connect(self, connection_name: str, + username: Optional[str] = None, + password: Optional[str] = None) -> None: + """Connect to a VPN. + + Args: + connection_name: Name of the NetworkManager connection + username: Optional username for authentication + password: Optional password for authentication + """ + if username and password: + # Create temporary secrets file + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write(f"vpn.secrets.password:{password}\n") + if username: + f.write(f"vpn.data.username:{username}\n") + secrets_file = f.name + + try: + self._run_nmcli([ + 'connection', 'up', connection_name, + 'passwd-file', secrets_file + ]) + finally: + # Always clean up secrets file + os.unlink(secrets_file) + else: + # Connect without credentials (will prompt if needed) + self._run_nmcli(['connection', 'up', connection_name]) + + def disconnect(self, connection_name: str) -> None: + """Disconnect from a VPN. + + Args: + connection_name: Name of the NetworkManager connection + """ + self._run_nmcli(['connection', 'down', connection_name]) + + def get_status(self, connection_name: str) -> VPNStatus: + """Get the status of a VPN connection. + + Args: + connection_name: Name of the NetworkManager connection + + Returns: + Current status of the VPN connection + """ + result = self._run_nmcli( + ['connection', 'show', '--active'], + check=False + ) + + if connection_name in result.stdout: + # Parse the actual state + state_result = self._run_nmcli( + ['connection', 'show', connection_name], + check=False + ) + + if 'GENERAL.STATE:' in state_result.stdout: + if 'activated' in state_result.stdout: + return VPNStatus.CONNECTED + elif 'activating' in state_result.stdout: + return VPNStatus.CONNECTING + elif 'deactivating' in state_result.stdout: + return VPNStatus.DISCONNECTING + + return VPNStatus.DISCONNECTED + + def list_connections(self, vpn_only: bool = True) -> List[VPNConnection]: + """List all NetworkManager connections. + + Args: + vpn_only: If True, only return VPN connections + + Returns: + List of VPNConnection objects + """ + args = ['connection', 'show'] + if vpn_only: + args.extend(['--type', 'vpn']) + + result = self._run_nmcli(args, check=False) + + connections = [] + for line in result.stdout.strip().split('\n')[1:]: # Skip header + if not line: + continue + + parts = line.split() + if len(parts) >= 4: + name = parts[0] + uuid = parts[1] + conn_type = parts[2] + device = parts[3] if parts[3] != '--' else None + + # Get current status + status = self.get_status(name) + + connections.append(VPNConnection( + name=name, + uuid=uuid, + type=conn_type, + device=device, + state=status + )) + + return connections + + def delete_connection(self, connection_name: str) -> None: + """Delete a NetworkManager connection. + + Args: + connection_name: Name of the connection to delete + """ + self._run_nmcli(['connection', 'delete', connection_name]) + + def connection_exists(self, connection_name: str) -> bool: + """Check if a connection exists. + + Args: + connection_name: Name of the connection to check + + Returns: + True if the connection exists + """ + result = self._run_nmcli( + ['connection', 'show', connection_name], + check=False + ) + return result.returncode == 0 + + def modify_connection(self, connection_name: str, + settings: Dict[str, str]) -> None: + """Modify connection settings. + + Args: + connection_name: Name of the connection to modify + settings: Dictionary of setting key-value pairs + e.g., {'vpn.data.comp-lzo': 'yes'} + """ + for key, value in settings.items(): + self._run_nmcli([ + 'connection', 'modify', connection_name, + key, value + ]) + + def get_connection_details(self, connection_name: str) -> Dict[str, str]: + """Get detailed information about a connection. + + Args: + connection_name: Name of the connection + + Returns: + Dictionary of connection properties + """ + result = self._run_nmcli(['connection', 'show', connection_name]) + + details = {} + for line in result.stdout.strip().split('\n'): + if ':' in line: + key, value = line.split(':', 1) + details[key.strip()] = value.strip() + + return details + + def get_active_vpn_interface(self, connection_name: str) -> Optional[str]: + """Get the network interface used by an active VPN connection. + + Args: + connection_name: Name of the VPN connection + + Returns: + Interface name (e.g., 'tun0') or None if not connected + """ + if self.get_status(connection_name) != VPNStatus.CONNECTED: + return None + + details = self.get_connection_details(connection_name) + return details.get('GENERAL.DEVICES') + + def get_vpn_ip_address(self, connection_name: str) -> Optional[str]: + """Get the IP address assigned to the VPN connection. + + Args: + connection_name: Name of the VPN connection + + Returns: + IP address or None if not connected + """ + interface = self.get_active_vpn_interface(connection_name) + if not interface: + return None + + result = self._run_nmcli(['device', 'show', interface], check=False) + + for line in result.stdout.split('\n'): + if 'IP4.ADDRESS' in line and 'IP4.ADDRESS[2]' not in line: + # Format is usually "IP4.ADDRESS[1]: 10.0.0.1/24" + if ':' in line: + addr_part = line.split(':', 1)[1].strip() + if '/' in addr_part: + return addr_part.split('/')[0] + + return None