diff --git a/CLAUDE.md b/CLAUDE.md index 25ce966..906037f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -32,19 +32,19 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co **main.py** - Main GUI application entry point - `VPNManagerWindow` class: Primary PyGObject/GTK3-based GUI application -- Implements single-view layout with Gtk.Stack for smooth transitions +- Two-column layout: active customers (left) vs inactive customers (right) - Features system tray integration using `pystray` -- Uses GNOME-style theming with CSS styling for cards -- Includes advanced search functionality with wildcard support (`*`) +- Uses GNOME-style theming with CSS card styling +- Includes comprehensive logging system with collapsible log view - HeaderBar for native GNOME look and feel -- Current location tracking and display +- Current location tracking and enhanced display with network topology **models.py** - Type-safe data model definitions using dataclasses and enums - `ServiceType`: Enum for service types (SSH, Web GUI, RDP, VNC, SMB, Database, FTP) - `HostType`: Enum for host types (Linux, Windows, Windows Server, Proxmox, ESXi, Router, Switch) - `VPNType`: Enum for VPN types (OpenVPN, WireGuard, IPSec) - `Service`: Individual services on hosts with type-safe enums and port numbers -- `Host`: Physical/virtual machines with services and recursive sub-hosts (VMs) +- `Host`: Physical/virtual machines with multiple IP addresses, services, and recursive sub-hosts (VMs) - `Location`: Customer locations with VPN configurations and host infrastructure - `CustomerService`: Customer's cloud/web services (O365, CRM, etc.) - `Customer`: Top-level entities containing services and locations @@ -66,8 +66,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co **widgets/** - Modular UI components using PyGObject - `customer_card.py`: `ActiveCustomerCard` and `InactiveCustomerCard` classes - - Active cards: Interactive buttons for customer services and full location details - - Inactive cards: Read-only service lists and location activation buttons + - **Compact tree-like design**: Hierarchical layout with expand/collapse arrows + - **Card styling**: Customer cards contain location subcards with proper visual hierarchy + - **Multi-column layout**: Fixed-width columns for proper alignment (name, IP, actions) + - **Service action icons**: Direct access buttons for SSH, RDP, Web GUI with tooltips + - **Multiple IP support**: Display primary IP with hover tooltip showing all addresses + - Active cards: Full interaction with connection controls and infrastructure details + - Inactive cards: Activation buttons and current location setting - `location_card.py`: `ActiveLocationCard` and `InactiveLocationCard` classes - Active cards: Connection controls, deactivation (X button), and infrastructure details - Inactive cards: Current location setting and activation buttons @@ -76,6 +81,19 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - Service buttons for direct access to SSH, Web GUI, RDP services - `__init__.py`: Widget exports for clean imports +**services/** - VPN and credential management (modular architecture) +- `vpn_manager.py`: NetworkManager (nmcli) integration with .ovpn file support +- `passbolt_client.py`: Passbolt CLI client for secure credential management +- `connection_manager.py`: High-level orchestrator combining VPN and credentials +- Support for flexible credential storage (direct username/password or Passbolt UUIDs) + +**views/** - Comprehensive logging system +- `log_view.py`: `LogView` class with collapsible interface + - **Command logging**: Real-time capture of nmcli and system command output + - **Color-coded levels**: Info, success, warning, error with visual distinction + - **Auto-scroll**: Automatic scrolling to latest entries with manual override + - **Expandable/collapsible**: Bottom panel that can be hidden to save space + **Configuration Files** - `init_config.py`: Helper script to initialize user configuration with examples - `example_customer.yaml`: Complete example showing YAML schema with all features @@ -93,13 +111,16 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - **Active/Inactive**: Locations can be activated for VPN management - **Current Location**: User's physical location (separate from VPN connections) - **Connection State**: VPN connection status independent of location activation +- **Network Topology**: Each location includes internal networks and external endpoints +- **Credential Management**: Flexible credential storage (direct or Passbolt UUID) - Automatic UI updates based on state changes with immediate feedback -**Single-View UI Architecture with Stack Navigation**: -- Uses `Gtk.Stack` for smooth view transitions with crossfade animation -- **Normal mode**: Shows only active locations (full detail view) -- **Search mode**: Shows only inactive locations (activation and current location setting) -- Clean visual separation with no overlapping or confusing dual-column layouts +**Two-Column Layout Architecture**: +- **Left column**: Active customers with full location details and infrastructure +- **Right column**: Inactive customers available for activation +- **Compact design**: Tree-like hierarchy with proper indentation and alignment +- **Real-time filtering**: Search affects both columns simultaneously +- **Dynamic reorganization**: Customers move between columns based on location state **Widget-Based Component System**: - Modular widget classes handle their own GTK widget creation and event handling @@ -118,7 +139,15 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co The application tracks two distinct location concepts: - **Current Location**: Where the user physically is (set via "Set as Current" button) - **Active Locations**: Locations available for VPN connections -- Current location is displayed prominently above the search bar + +**Enhanced Current Location Display**: +- **Prominent info box** with customer name, location, and VPN type +- **Host count summary** with VM breakdown (e.g., "3 hosts (7 total with VMs)") +- **Collapsible infrastructure section** with detailed host and VM information +- **Network topology display**: Internal networks and external endpoints +- **Visual host type icons** (🐧 Linux, 🪟 Windows, 📦 Proxmox, 🌐 Router, etc.) +- **Hierarchical VM display** with service counts and multiple IP addresses +- **Multi-interface support**: Hosts can have multiple IP addresses (firewalls, routers) - Users can set current location from inactive location cards without activating VPN ### Search and Discovery Features @@ -142,16 +171,24 @@ The application tracks two distinct location concepts: ### UI Layout Structure -**Modern Single-View Design**: -- HeaderBar with title and subtitle (GNOME HIG compliance) -- Current location display (centered, prominent) -- Search entry with comprehensive placeholder text (supports `*` wildcard) -- Single-view layout using Gtk.Stack for smooth transitions -- **Normal mode**: Active locations with full interaction (connections, services, infrastructure) -- **Search mode**: Inactive locations with activation and current location setting +**Modern Two-Column Design**: +- HeaderBar with title and current location display +- **Enhanced current location info box** with network topology and collapsible infrastructure +- Search entry with real-time filtering across both columns +- **Left column**: Active customers with full interaction (connections, services, infrastructure) +- **Right column**: Inactive customers with activation and current location setting +- **Compact tree-like cards** with customer cards containing location subcards +- **Fixed-width columns**: Proper alignment of host names, IP addresses, and action icons +- **Collapsible log view**: Bottom panel for command output and system logs - GNOME-style cards with CSS theming, proper spacing, and visual hierarchy - System tray integration for minimize-to-tray behavior +**Customer Card Features**: +- **Active cards**: Start expanded, show full location details and services +- **Inactive cards**: Start collapsed to save space during search +- **Location count badges**: Show number of locations in parentheses +- **Smooth expand/collapse**: Click arrow buttons to toggle content visibility + ### GTK3/PyGObject Specific Features - **CSS styling**: GNOME-style cards with borders, shadows, and adaptive theming @@ -188,12 +225,28 @@ locations: - name: Location Name vpn_type: OpenVPN|WireGuard|IPSec vpn_config: /path/to/config/file - active: true|false # Available for VPN management - connected: true|false # Current VPN connection status + + # VPN credentials (three options): + # Option 1: Dictionary with username/password + vpn_credentials: + username: vpnuser + password: password123 + + # Option 2: Passbolt UUID (for future implementation) + # vpn_credentials: "550e8400-e29b-41d4-a716-446655440000" + + # Option 3: Omit or set to null if no credentials needed + # vpn_credentials: null + + # Note: active and connected are runtime state (not stored in config) + + # Network topology information + external_addresses: [vpn.domain.com, backup.domain.com] # VPN endpoints + networks: [192.168.1.0/24, 10.0.1.0/24] # Internal networks hosts: - name: Host Name - ip_address: IP Address + ip_addresses: [192.168.1.10, 10.0.1.10] # Multiple interfaces supported host_type: Linux|Windows|Windows Server|Proxmox|ESXi|Router|Switch description: Optional description @@ -204,7 +257,7 @@ locations: sub_hosts: # Optional VMs/containers (recursive structure) - name: VM Name - ip_address: VM IP + ip_addresses: [192.168.1.20] # VMs can also have multiple IPs host_type: Linux|Windows|Windows Server services: # Same structure as parent host - name: Service Name diff --git a/assets/icons/debian.svg b/assets/icons/debian.svg new file mode 100644 index 0000000..9460799 --- /dev/null +++ b/assets/icons/debian.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/assets/icons/docker.svg b/assets/icons/docker.svg new file mode 100644 index 0000000..0903acd --- /dev/null +++ b/assets/icons/docker.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/assets/icons/esxi.svg b/assets/icons/esxi.svg new file mode 100644 index 0000000..f0770ab --- /dev/null +++ b/assets/icons/esxi.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/assets/icons/exchange.svg b/assets/icons/exchange.svg new file mode 100644 index 0000000..fba7bfa --- /dev/null +++ b/assets/icons/exchange.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/assets/icons/opnsense.svg b/assets/icons/opnsense.svg new file mode 100644 index 0000000..65e6a58 --- /dev/null +++ b/assets/icons/opnsense.svg @@ -0,0 +1,31 @@ + + + + + diff --git a/assets/icons/proxmox.svg b/assets/icons/proxmox.svg new file mode 100644 index 0000000..fa1d942 --- /dev/null +++ b/assets/icons/proxmox.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/assets/icons/windows-11.png b/assets/icons/windows-11.png new file mode 100644 index 0000000..ce7b366 Binary files /dev/null and b/assets/icons/windows-11.png differ diff --git a/control-panel.png b/control-panel.png deleted file mode 100644 index de80b50..0000000 Binary files a/control-panel.png and /dev/null differ diff --git a/current_view.png b/current_view.png deleted file mode 100644 index 7b66925..0000000 Binary files a/current_view.png and /dev/null differ diff --git a/customer_card.png b/customer_card.png new file mode 100644 index 0000000..60114d2 Binary files /dev/null and b/customer_card.png differ diff --git a/data_loader.py b/data_loader.py index a1ab8ad..358e2a8 100644 --- a/data_loader.py +++ b/data_loader.py @@ -2,7 +2,7 @@ import yaml from pathlib import Path from typing import List, Dict, Any from models import ( - Customer, CustomerService, Location, Host, Service, + Customer, CustomerService, Location, Host, Service, NetworkSegment, HostIP, PortForwarding, ServiceType, HostType, VPNType ) @@ -73,11 +73,42 @@ def parse_host(host_data: Dict[str, Any]) -> Host: ) services.append(service) + # Parse IP addresses - handle both new HostIP format and legacy formats + ip_addresses = [] + if 'ip_addresses' in host_data: + for ip_data in host_data['ip_addresses']: + if isinstance(ip_data, dict): + # New HostIP format: {ip_address: "192.168.1.10", network_segment: "LAN", is_primary: true} + host_ip = HostIP( + ip_address=ip_data['ip_address'], + network_segment=ip_data.get( + 'network_segment', 'LAN'), # Default segment + is_primary=ip_data.get('is_primary', False) + ) + ip_addresses.append(host_ip) + else: + # Legacy format: simple string list + host_ip = HostIP( + ip_address=ip_data, + network_segment='LAN', # Default segment for legacy format + is_primary=len(ip_addresses) == 0 # First IP is primary + ) + ip_addresses.append(host_ip) + elif 'ip_address' in host_data: + # Very old format: single IP string + host_ip = HostIP( + ip_address=host_data['ip_address'], + network_segment='LAN', + is_primary=True + ) + ip_addresses.append(host_ip) + # Create host host = Host( name=host_data['name'], - ip_address=host_data['ip_address'], + ip_addresses=ip_addresses, host_type=parse_host_type(host_data['host_type']), + icon=host_data.get('icon'), # Custom icon name description=host_data.get('description', ''), services=services ) @@ -93,6 +124,34 @@ def parse_host(host_data: Dict[str, Any]) -> Host: def parse_location(location_data: Dict[str, Any]) -> Location: """Parse a location from YAML data.""" + # Parse network segments + network_segments = [] + if 'network_segments' in location_data: + for segment_data in location_data['network_segments']: + segment = NetworkSegment( + name=segment_data['name'], + cidr=segment_data['cidr'], + vlan_id=segment_data.get('vlan_id'), + zone=segment_data.get('zone', 'general'), + gateway=segment_data.get('gateway'), + description=segment_data.get('description', '') + ) + network_segments.append(segment) + + # Parse port forwardings + port_forwardings = [] + if 'port_forwardings' in location_data: + for pf_data in location_data['port_forwardings']: + port_forward = PortForwarding( + external_port=pf_data['external_port'], + internal_ip=pf_data['internal_ip'], + internal_port=pf_data['internal_port'], + protocol=pf_data.get('protocol', 'tcp'), + description=pf_data.get('description', ''), + enabled=pf_data.get('enabled', True) + ) + port_forwardings.append(port_forward) + # Parse hosts hosts = [] if 'hosts' in location_data: @@ -108,6 +167,10 @@ def parse_location(location_data: Dict[str, Any]) -> Location: active=False, # Runtime state - always starts inactive vpn_config=location_data.get('vpn_config', ''), hosts=hosts, + network_segments=network_segments, + networks=location_data.get('networks', []), # Legacy support + external_addresses=location_data.get('external_addresses', []), + port_forwardings=port_forwardings, vpn_credentials=location_data.get('vpn_credentials'), nmcli_connection_name=location_data.get('nmcli_connection_name'), auto_import=location_data.get('auto_import', True) @@ -204,20 +267,64 @@ def save_customer(customer: Customer, filename: str = None) -> None: # Convert locations for location in customer.locations: + # Convert network segments + network_segments = [] + for segment in location.network_segments: + segment_data = { + 'name': segment.name, + 'cidr': segment.cidr, + 'zone': segment.zone, + 'description': segment.description + } + if segment.vlan_id is not None: + segment_data['vlan_id'] = segment.vlan_id + if segment.gateway is not None: + segment_data['gateway'] = segment.gateway + network_segments.append(segment_data) + + # Convert port forwardings + port_forwardings = [] + for pf in location.port_forwardings: + pf_data = { + 'external_port': pf.external_port, + 'internal_ip': pf.internal_ip, + 'internal_port': pf.internal_port, + 'protocol': pf.protocol, + 'enabled': pf.enabled + } + if pf.description: + pf_data['description'] = pf.description + port_forwardings.append(pf_data) + location_data = { 'name': location.name, 'vpn_type': location.vpn_type.value, 'vpn_config': location.vpn_config, - 'active': location.active, - 'connected': location.connected, + 'network_segments': network_segments, + 'external_addresses': location.external_addresses, + 'port_forwardings': port_forwardings, 'hosts': [] } + # Add legacy networks if they exist + if location.networks: + location_data['networks'] = location.networks + # Convert hosts def convert_host(host): + # Convert HostIP objects back to dictionaries + ip_addresses = [] + for host_ip in host.ip_addresses: + ip_dict = { + 'ip_address': host_ip.ip_address, + 'network_segment': host_ip.network_segment, + 'is_primary': host_ip.is_primary + } + ip_addresses.append(ip_dict) + host_data = { 'name': host.name, - 'ip_address': host.ip_address, + 'ip_addresses': ip_addresses, 'host_type': host.host_type.value, 'description': host.description, 'services': [ @@ -230,6 +337,10 @@ def save_customer(customer: Customer, filename: str = None) -> None: ] } + # Add icon if specified + if host.icon: + host_data['icon'] = host.icon + if host.sub_hosts: host_data['sub_hosts'] = [convert_host( subhost) for subhost in host.sub_hosts] @@ -267,12 +378,22 @@ def get_demo_customers() -> List[Customer]: vpn_type=VPNType.OPENVPN, connected=False, active=True, - vpn_config="/etc/openvpn/demo.ovpn" + vpn_config="demo.ovpn" # File in ~/.vpntray/vpn/ + ) + + # Create a demo network segment + demo_segment = NetworkSegment( + name="LAN", + cidr="10.0.0.0/24", + gateway="10.0.0.1", + zone="production", + description="Demo network" ) demo_host = Host( name="DEMO-01", - ip_address="10.0.0.1", + ip_addresses=[HostIP(ip_address="10.0.0.1", + network_segment="LAN", is_primary=True)], host_type=HostType.LINUX, description="Demo server", services=[ @@ -282,6 +403,7 @@ def get_demo_customers() -> List[Customer]: ) demo_location.hosts = [demo_host] + demo_location.network_segments = [demo_segment] demo_customer.locations = [demo_location] return [demo_customer] @@ -317,13 +439,24 @@ services: locations: - name: Main Office vpn_type: WireGuard - vpn_config: /etc/wireguard/simple.conf - active: false - connected: false + vpn_config: simple.conf # File in ~/.vpntray/vpn/ + + network_segments: + - name: LAN + cidr: 192.168.1.0/24 + gateway: 192.168.1.1 + zone: production + description: Main office network + + external_addresses: + - simple.vpn.example.com hosts: - name: SERVER-01 - ip_address: 192.168.1.10 + ip_addresses: + - ip_address: 192.168.1.10 + network_segment: LAN + is_primary: true host_type: Linux description: Main server services: diff --git a/example_customer.yaml b/example_customer.yaml index 60b2d30..7447371 100644 --- a/example_customer.yaml +++ b/example_customer.yaml @@ -22,7 +22,72 @@ services: locations: - name: Main Office vpn_type: OpenVPN - vpn_config: /etc/openvpn/techcorp-main.ovpn + vpn_config: techcorp-main.ovpn # File in ~/.vpntray/vpn/ + + # External connection endpoints (can have multiple for redundancy) + external_addresses: + - vpn.techcorp.com # Primary VPN endpoint + - vpn2.techcorp.com # Backup endpoint + - 203.0.113.10 # Direct IP fallback + + # Port forwarding rules for external access + port_forwardings: + - external_port: 8006 + internal_ip: 192.168.1.10 + internal_port: 8006 + protocol: tcp + description: Proxmox web interface + enabled: true + + - external_port: 3389 + internal_ip: 192.168.1.20 + internal_port: 3389 + protocol: tcp + description: Domain Controller RDP + enabled: true + + - external_port: 9000 + internal_ip: 192.168.1.21 + internal_port: 9000 + protocol: tcp + description: File server web panel + enabled: true + + - external_port: 5050 + internal_ip: 192.168.1.22 + internal_port: 5050 + protocol: tcp + description: pgAdmin database interface + enabled: true + + - external_port: 443 + internal_ip: 192.168.1.1 + internal_port: 443 + protocol: tcp + description: Firewall web interface + enabled: true + + # Network segments with rich metadata + network_segments: + - name: LAN + cidr: 192.168.1.0/24 + gateway: 192.168.1.1 + zone: production + description: Main office LAN + + - name: Management + cidr: 10.0.1.0/24 + vlan_id: 100 + gateway: 10.0.1.1 + zone: management + description: Out-of-band management network + + - name: Services + cidr: 172.16.1.0/24 + vlan_id: 200 + gateway: 172.16.1.1 + zone: production + description: Internal services network # VPN credentials - three options: # Option 1: Dictionary with username/password @@ -39,8 +104,12 @@ locations: # Hosts at this location hosts: - name: PVE-01 - ip_address: 192.168.1.10 + ip_addresses: + - ip_address: 192.168.1.10 + network_segment: LAN + is_primary: true host_type: Proxmox + icon: proxmox # Custom icon: assets/icons/proxmox.svg description: Main virtualization server services: - name: Web Interface @@ -53,7 +122,10 @@ locations: # VMs running on this host sub_hosts: - name: DC-01 - ip_address: 192.168.1.20 + ip_addresses: + - ip_address: 192.168.1.20 + network_segment: LAN + is_primary: true host_type: Windows Server description: Domain Controller services: @@ -65,8 +137,12 @@ locations: port: 8080 - name: FILE-01 - ip_address: 192.168.1.21 + ip_addresses: + - ip_address: 192.168.1.21 + network_segment: LAN + is_primary: true host_type: Linux + icon: ubuntu # Custom icon: assets/icons/ubuntu.svg description: File Server (Samba) services: - name: SSH @@ -80,9 +156,15 @@ locations: port: 9000 - name: DB-01 - ip_address: 192.168.1.22 + ip_addresses: + - ip_address: 192.168.1.22 + network_segment: LAN + is_primary: true + - ip_address: 172.16.1.22 + network_segment: Services + is_primary: false host_type: Linux - description: PostgreSQL Database + description: PostgreSQL Database (dual-homed) services: - name: SSH service_type: SSH @@ -95,9 +177,19 @@ locations: port: 5050 - name: FW-01 - ip_address: 192.168.1.1 + ip_addresses: + - ip_address: 192.168.1.1 + network_segment: LAN + is_primary: true + - ip_address: 10.0.1.1 + network_segment: Management + is_primary: false + - ip_address: 172.16.1.1 + network_segment: Services + is_primary: false host_type: Router - description: pfSense Firewall/Router + icon: pfsense # Custom icon: assets/icons/pfsense.svg + description: pfSense Firewall/Router (multi-interface) services: - name: Web Interface service_type: Web GUI @@ -107,9 +199,15 @@ locations: port: 22 - name: SW-01 - ip_address: 192.168.1.2 + ip_addresses: + - ip_address: 192.168.1.2 + network_segment: LAN + is_primary: true + - ip_address: 10.0.1.2 + network_segment: Management + is_primary: false host_type: Switch - description: Managed Switch + description: Managed Switch (dual-homed) services: - name: Web Interface service_type: Web GUI @@ -120,16 +218,57 @@ locations: - name: Branch Office vpn_type: WireGuard - vpn_config: /etc/wireguard/techcorp-branch.conf + vpn_config: techcorp-branch.conf # File in ~/.vpntray/vpn/ + + # External connection endpoints + external_addresses: + - 198.51.100.50 # Branch office static IP + - branch.techcorp.com # Dynamic DNS endpoint + + # Port forwarding rules + port_forwardings: + - external_port: 8080 + internal_ip: 10.10.1.10 + internal_port: 8080 + protocol: tcp + description: Branch web services + enabled: true + + - external_port: 22 + internal_ip: 10.10.1.10 + internal_port: 22 + protocol: tcp + description: SSH access to branch server + enabled: false # Disabled for security + + # Network segments + network_segments: + - name: Branch_LAN + cidr: 10.10.1.0/24 + gateway: 10.10.1.1 + zone: production + description: Branch office network + + - name: Local_Services + cidr: 192.168.100.0/24 + gateway: 192.168.100.1 + zone: general + description: Local branch services network # No credentials needed for WireGuard (uses keys in config file) vpn_credentials: null hosts: - name: BRANCH-01 - ip_address: 10.10.1.10 + ip_addresses: + - ip_address: 10.10.1.10 + network_segment: Branch_LAN + is_primary: true + - ip_address: 192.168.100.1 + network_segment: Local_Services + is_primary: false host_type: Linux - description: Branch office server + description: Branch office server (dual-homed) services: - name: SSH service_type: SSH diff --git a/init_config.py b/init_config.py index 12816d9..3159f4f 100644 --- a/init_config.py +++ b/init_config.py @@ -12,12 +12,12 @@ from data_loader import initialize_example_customers, get_config_dir def main(): """Initialize VPNTray configuration with example customers.""" config_dir = get_config_dir() - + print("VPNTray Configuration Initializer") print("=" * 35) print(f"Configuration directory: {config_dir}") print() - + try: initialize_example_customers() print() @@ -32,11 +32,11 @@ def main(): print("- Each customer gets their own .yaml/.yml file") print("- File names don't matter (use descriptive names)") print("- See example_customer.yaml for the complete schema") - + except Exception as e: print(f"❌ Error initializing configuration: {e}") sys.exit(1) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/main.py b/main.py index b29df67..bc300f6 100644 --- a/main.py +++ b/main.py @@ -1,22 +1,27 @@ #!/usr/bin/env python3 -from views import ActiveView, InactiveView +from views import ActiveView, InactiveView, LogView from data_loader import load_customers from models import Customer -from PIL import Image, ImageDraw -import pystray -import threading +# from services import VPNManager, VPNStatus, VPNConnectionError # Temporarily disabled due to syntax errors +from services import VPNManager, VPNStatus, VPNConnectionError import sys -from gi.repository import Gtk, Gdk, GLib +import logging +from gi.repository import Gtk, Gdk, GLib, Gio import gi gi.require_version('Gtk', '3.0') class VPNManagerWindow: + vpn_manager: VPNManager + def __init__(self): self.customers = load_customers() self.filtered_customers = self.customers.copy() self.current_location = None # Track user's current location + # VPN manager will be initialized after UI setup + self.vpn_manager = None + # Create main window self.window = Gtk.Window() self.window.set_title("VPN Manager") @@ -24,30 +29,18 @@ class VPNManagerWindow: self.window.connect("delete-event", self.quit_app_from_close) self.window.connect("window-state-event", self.on_window_state_event) -# Set up minimal CSS for GNOME-style cards + # Set up minimal CSS for GNOME-style cards self.setup_css() # Create UI self.setup_ui() - self.setup_system_tray() - - # Start hidden - self.window.hide() + self.vpn_manager = VPNManager() def setup_css(self): """Minimal CSS for GNOME-style cards""" css_provider = Gtk.CssProvider() - css = """ - .card { - background: @theme_base_color; - border-radius: 8px; - border: 1px solid @borders; - box-shadow: 0 1px 3px rgba(0,0,0,0.1); - padding: 16px; - margin: 6px; - } - """ - css_provider.load_from_data(css.encode()) + css_provider.load_from_file(Gio.File.new_for_path('style.css')) + # css_provider.load_from_data(css.encode()) # Apply CSS to default screen screen = Gdk.Screen.get_default() @@ -72,12 +65,9 @@ class VPNManagerWindow: main_vbox.set_margin_bottom(12) self.window.add(main_vbox) - # Current location display - self.current_location_label = Gtk.Label() - self.current_location_label.set_markup("Current location: Not set") - self.current_location_label.set_halign(Gtk.Align.CENTER) - self.current_location_label.set_margin_bottom(8) - main_vbox.pack_start(self.current_location_label, False, False, 0) + # Current location display - enhanced info box + self.location_info_box = self._create_location_info_box() + main_vbox.pack_start(self.location_info_box, False, False, 0) # Search bar with SearchEntry self.search_entry = Gtk.SearchEntry() @@ -91,62 +81,65 @@ class VPNManagerWindow: self.view_stack.set_transition_type(Gtk.StackTransitionType.CROSSFADE) self.view_stack.set_transition_duration(200) main_vbox.pack_start(self.view_stack, True, True, 0) - + # Get callbacks for views callbacks = self.get_callbacks() - + # Create active view (shown by default) self.active_view = ActiveView(callbacks) self.view_stack.add_named(self.active_view.widget, "active") - + # Create inactive view (shown when searching) self.inactive_view = InactiveView(callbacks) self.view_stack.add_named(self.inactive_view.widget, "inactive") + # Create log section at bottom (collapsible) + self._create_log_section(main_vbox) + + # Initialize VPN manager (temporarily disabled due to syntax errors) + # TODO: Fix VPN manager syntax and re-enable + self.vpn_manager = None + self.log_view.log_info( + "VPN manager temporarily disabled for debugging") + self.log_view.log_info("Using mock mode for VPN operations") + # Render initial data self.render_customers() - def setup_system_tray(self): - # Create a simple icon for the system tray - def create_icon(): - # Create a simple network icon - width = height = 64 - image = Image.new('RGBA', (width, height), (0, 0, 0, 0)) - draw = ImageDraw.Draw(image) + # Update VPN status from actual connections + self.update_vpn_status() - # Draw a simple network/VPN icon - # Outer circle - draw.ellipse([8, 8, 56, 56], outline=(50, 150, 50), width=4) - # Inner dot - draw.ellipse([26, 26, 38, 38], fill=(50, 150, 50)) - # Connection lines - draw.line([32, 16, 32, 24], fill=(50, 150, 50), width=3) - draw.line([32, 40, 32, 48], fill=(50, 150, 50), width=3) - draw.line([16, 32, 24, 32], fill=(50, 150, 50), width=3) - draw.line([40, 32, 48, 32], fill=(50, 150, 50), width=3) + def _setup_logging(self): + """Set up logging to route VPN manager logs to LogView.""" + # Create a custom handler that forwards to our LogView + class LogViewHandler(logging.Handler): + def __init__(self, log_view): + super().__init__() + self.log_view = log_view - return image + def emit(self, record): + try: + msg = self.format(record) + if record.levelno >= logging.ERROR: + self.log_view.log_error(msg) + elif record.levelno >= logging.WARNING: + self.log_view.log_warning(msg) + elif record.levelno >= logging.INFO: + self.log_view.log_info(msg) + else: # DEBUG + self.log_view.log_debug(msg) + except Exception: + self.handleError(record) - # Simple approach: Create tray icon with direct action and minimal menu - self.tray_icon = pystray.Icon( - "VPN Manager", - create_icon(), - "VPN Manager - Double-click to open" - ) + # Set up handler for VPN manager logs + handler = LogViewHandler(self.log_view) + handler.setFormatter(logging.Formatter('%(message)s')) - # Set direct click action - self.tray_icon.default_action = self.show_window_from_tray - - # Also provide a right-click menu - menu = pystray.Menu( - pystray.MenuItem("Open VPN Manager", - self.show_window_from_tray, default=True), - pystray.MenuItem("Quit", self.quit_app) - ) - self.tray_icon.menu = menu - - # Start tray icon in separate thread - threading.Thread(target=self.tray_icon.run, daemon=True).start() + # Add handler to VPN manager logger + vpn_logger = logging.getLogger('services.vpn_manager') + vpn_logger.addHandler(handler) + vpn_logger.setLevel(logging.DEBUG) + vpn_logger.propagate = False # Don't send to root logger def get_callbacks(self): """Return callback functions for widget interactions""" @@ -204,10 +197,12 @@ class VPNManagerWindow: target_location = customer.get_location_by_name(location.name) if target_location: target_location.active = True + self.log_view.log_info( + f"Activated location: {customer.name} - {target_location.name}") print( f"Mock: Setting {customer.name} - {target_location.name} as active") break - + # Clear search and return to active view self.search_entry.set_text("") self.render_customers() @@ -219,35 +214,322 @@ class VPNManagerWindow: if target_location: target_location.active = False target_location.connected = False # Disconnect when deactivating + self.log_view.log_info( + f"Deactivated location: {customer.name} - {target_location.name}") print( f"Mock: Deactivating {customer.name} - {target_location.name}") break self.render_customers() - + def set_current_location(self, location, customer_name): """Set the user's current location.""" for customer in self.customers: if customer.name == customer_name: target_location = customer.get_location_by_name(location.name) if target_location: - self.current_location = (customer.name, target_location.name) - print(f"Current location set to: {customer.name} - {target_location.name}") + self.current_location = ( + customer.name, target_location.name) + self.log_view.log_info( + f"Current location set to: {customer.name} - {target_location.name}") + print( + f"Current location set to: {customer.name} - {target_location.name}") self.update_current_location_display() break - + + def _create_location_info_box(self): + """Create the enhanced current location info box.""" + frame = Gtk.Frame() + frame.get_style_context().add_class("location-info") + frame.set_shadow_type(Gtk.ShadowType.NONE) + + vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + frame.add(vbox) + + # Title row with infrastructure toggle + title_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + vbox.pack_start(title_box, False, False, 0) + + title_label = Gtk.Label() + title_label.set_markup("📍 Current Location") + title_label.set_halign(Gtk.Align.START) + title_box.pack_start(title_label, False, False, 0) + + # Infrastructure toggle button (only shown when location is set) + self.infrastructure_toggle = Gtk.Button() + self.infrastructure_toggle.set_relief(Gtk.ReliefStyle.NONE) + self.infrastructure_toggle.set_can_focus(False) + self.infrastructure_toggle.set_label("▶") + self.infrastructure_toggle.set_tooltip_text("Show/hide infrastructure") + self.infrastructure_toggle.connect( + "clicked", self._on_infrastructure_toggle) + self.infrastructure_toggle.set_visible(False) + title_box.pack_end(self.infrastructure_toggle, False, False, 0) + + # Location details label + self.location_details_label = Gtk.Label() + self.location_details_label.set_markup("Not set") + self.location_details_label.set_halign(Gtk.Align.START) + vbox.pack_start(self.location_details_label, False, False, 0) + + # Additional info row (hosts, services, etc.) + self.location_extra_info = Gtk.Label() + self.location_extra_info.set_halign(Gtk.Align.START) + self.location_extra_info.set_visible(False) + vbox.pack_start(self.location_extra_info, False, False, 0) + + # Infrastructure section (collapsible) + self.infrastructure_box = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=6) + self.infrastructure_box.set_margin_top(8) + self.infrastructure_box.set_visible(False) + vbox.pack_start(self.infrastructure_box, False, False, 0) + + # Track infrastructure expanded state + self.infrastructure_expanded = False + + return frame + + def _create_log_section(self, main_vbox): + """Create the collapsible log section at the bottom.""" + # Log section container + log_container = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=0) + log_container.get_style_context().add_class("log-section") + main_vbox.pack_end(log_container, False, False, 0) + + # Log header with toggle button + log_header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + log_header.set_margin_start(12) + log_header.set_margin_end(12) + log_header.set_margin_top(8) + log_header.set_margin_bottom(8) + log_container.pack_start(log_header, False, False, 0) + + # Toggle button for log visibility + self.log_toggle = Gtk.Button() + self.log_toggle.set_relief(Gtk.ReliefStyle.NONE) + self.log_toggle.set_can_focus(False) + self.log_toggle.set_label("▲") + self.log_toggle.set_tooltip_text("Show/hide command log") + self.log_toggle.connect("clicked", self._on_log_toggle) + log_header.pack_start(self.log_toggle, False, False, 0) + + # Log section label + log_section_label = Gtk.Label() + log_section_label.set_markup("Command Log") + log_section_label.set_halign(Gtk.Align.START) + log_header.pack_start(log_section_label, False, False, 0) + + # Create the log view + self.log_view = LogView() + log_container.pack_start(self.log_view.widget, False, False, 0) + + # Start with log collapsed + self.log_expanded = False + self.log_view.set_visible(False) + + # Log some initial messages + self.log_view.log_info("VPN Manager started") + self.log_view.log_info(f"Loaded {len(self.customers)} customers") + + def _on_log_toggle(self, button): + """Toggle log section visibility.""" + self.log_expanded = not self.log_expanded + + if self.log_expanded: + self.log_toggle.set_label("▼") + self.log_view.set_visible(True) + else: + self.log_toggle.set_label("▲") + self.log_view.set_visible(False) + def update_current_location_display(self): - """Update the current location display label.""" + """Update the current location display with detailed information.""" if self.current_location: customer_name, location_name = self.current_location - self.current_location_label.set_markup( - f"📍 Current location: {customer_name} - {location_name}" - ) + + # Find the actual location object + location = None + for customer in self.customers: + if customer.name == customer_name: + location = customer.get_location_by_name(location_name) + if location: + break + + if location: + # Main location info + self.location_details_label.set_markup( + f"{customer_name} - {location_name}" + ) + + # Extra info about the location + host_count = len(location.hosts) + total_hosts = len(location.get_all_hosts_flat()) + vpn_type = location.vpn_type.value + + extra_text = f"{vpn_type} VPN" + if location.external_addresses: + if len(location.external_addresses) == 1: + extra_text += f" • 🌐 {location.external_addresses[0]}" + else: + extra_text += f" • 🌐 {len(location.external_addresses)} endpoints" + if location.networks: + extra_text += f" • 📡 {len(location.networks)} network{'s' if len(location.networks) > 1 else ''}" + extra_text += f" • {host_count} hosts" + if total_hosts > host_count: + extra_text += f" ({total_hosts} total with VMs)" + extra_text += "" + + self.location_extra_info.set_markup(extra_text) + self.location_extra_info.set_visible(True) + + # Show infrastructure toggle and rebuild infrastructure + self.infrastructure_toggle.set_visible(True) + self._rebuild_infrastructure_display(location) + else: + self.location_details_label.set_markup( + f"{customer_name} - {location_name}" + ) + self.location_extra_info.set_visible(False) + self.infrastructure_toggle.set_visible(False) else: - self.current_location_label.set_markup("Current location: Not set") + self.location_details_label.set_markup("Not set") + self.location_extra_info.set_visible(False) + self.infrastructure_toggle.set_visible(False) + self.infrastructure_box.set_visible(False) + + def _rebuild_infrastructure_display(self, location): + """Rebuild the infrastructure display for the current location.""" + # Clear existing infrastructure widgets + for child in self.infrastructure_box.get_children(): + child.destroy() + + # Add network information if available + if location.networks or location.external_addresses: + network_label = Gtk.Label() + network_label.set_markup("Network Configuration") + network_label.set_halign(Gtk.Align.START) + network_label.set_margin_bottom(4) + self.infrastructure_box.pack_start(network_label, False, False, 0) + + # External addresses + if location.external_addresses: + for i, address in enumerate(location.external_addresses): + ext_box = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + ext_box.set_margin_start(12) + self.infrastructure_box.pack_start( + ext_box, False, False, 0) + + label_text = "🌐 External:" if i == 0 else "🌐 Backup:" + ext_label = Gtk.Label() + ext_label.set_markup( + f"{label_text} {address}") + ext_label.set_halign(Gtk.Align.START) + ext_box.pack_start(ext_label, False, False, 0) + + # Internal networks + if location.networks: + for network in location.networks: + net_box = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + net_box.set_margin_start(12) + self.infrastructure_box.pack_start( + net_box, False, False, 0) + + net_label = Gtk.Label() + net_label.set_markup( + f"📡 Network: {network}") + net_label.set_halign(Gtk.Align.START) + net_box.pack_start(net_label, False, False, 0) + + # Add spacing before infrastructure + if location.hosts: + spacer = Gtk.Box() + spacer.set_size_request(-1, 8) + self.infrastructure_box.pack_start(spacer, False, False, 0) + + if not location.hosts: + return + + # Add infrastructure label + infra_label = Gtk.Label() + infra_label.set_markup("Infrastructure") + infra_label.set_halign(Gtk.Align.START) + infra_label.set_margin_bottom(4) + self.infrastructure_box.pack_start(infra_label, False, False, 0) + + # Add hosts + for host in location.hosts: + host_box = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + host_box.set_margin_start(12) + self.infrastructure_box.pack_start(host_box, False, False, 0) + + # Host type icon + host_type_icons = { + 'Linux': '🐧', + 'Windows': '🪟', + 'Windows Server': '🏢', + 'Proxmox': '📦', + 'ESXi': '⚙️', + 'Router': '📡', + 'Switch': '🔀' + } + icon = host_type_icons.get(host.host_type.value, '💻') + + # Host info + host_label = Gtk.Label() + service_count = len(host.services) + vm_count = len(host.sub_hosts) + + host_text = f"{icon} {host.name} ({host.ip_address})" + if service_count > 0: + host_text += f" • {service_count} services" + if vm_count > 0: + host_text += f" • {vm_count} VMs" + + host_label.set_markup(f"{host_text}") + host_label.set_halign(Gtk.Align.START) + host_box.pack_start(host_label, False, False, 0) + + # Add sub-hosts (VMs) if any + if host.sub_hosts: + for vm in host.sub_hosts: + vm_box = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + vm_box.set_margin_start(24) + self.infrastructure_box.pack_start(vm_box, False, False, 0) + + vm_icon = host_type_icons.get(vm.host_type.value, '💻') + vm_service_count = len(vm.services) + + vm_text = f"{vm_icon} {vm.name} ({vm.ip_address})" + if vm_service_count > 0: + vm_text += f" • {vm_service_count} services" + + vm_label = Gtk.Label() + vm_label.set_markup(f"{vm_text}") + vm_label.set_halign(Gtk.Align.START) + vm_box.pack_start(vm_label, False, False, 0) + + # Show all widgets (but container might be hidden) + self.infrastructure_box.show_all() + + def _on_infrastructure_toggle(self, button): + """Toggle infrastructure section visibility.""" + self.infrastructure_expanded = not self.infrastructure_expanded + + if self.infrastructure_expanded: + self.infrastructure_toggle.set_label("▼") + self.infrastructure_box.set_visible(True) + else: + self.infrastructure_toggle.set_label("▶") + self.infrastructure_box.set_visible(False) def filter_customers(self, entry): search_term = entry.get_text().strip() - + # Check for wildcard - show all customers if search_term == "*": self.filtered_customers = self.customers.copy() @@ -279,8 +561,12 @@ class VPNManagerWindow: # Check hosts and their services in this location def search_hosts(hosts): for host in hosts: + # Check IP addresses (search in any of the host's IPs) + ip_match = any(search_term_lower in host_ip.ip_address.lower( + ) for host_ip in host.ip_addresses) + if (search_term_lower in host.name.lower() or - search_term_lower in host.ip_address.lower() or + ip_match or search_term_lower in host.host_type.value.lower() or search_term_lower in host.description.lower()): return True @@ -307,11 +593,34 @@ class VPNManagerWindow: self.render_customers() def toggle_connection(self, location): - location.connected = not location.connected - status = "connected to" if location.connected else "disconnected from" - print(f"Mock: {status} {location.name} via {location.vpn_type.value}") + # Use actual VPN manager + if location.connected: + # Disconnect + self.log_view.log_info(f"Disconnecting from {location.name}...") + success = self.vpn_manager.disconnect_vpn(location) + if success: + location.connected = False + self.log_view.log_success(f"Disconnected from {location.name}") + else: + self.log_view.log_error( + f"Failed to disconnect from {location.name}") + else: + # Connect + self.log_view.log_info( + f"Connecting to {location.name} via {location.vpn_type.value}...") + success = self.vpn_manager.connect_vpn(location) + if success: + location.connected = True + self.log_view.log_success(f"Connected to {location.name}") + else: + self.log_view.log_error( + f"Failed to connect to {location.name}") + self.render_customers() + # Update VPN status after connection change + self.update_vpn_status() + def open_service(self, service): # Get the host IP from context - this would need to be passed properly in a real implementation print( @@ -343,6 +652,23 @@ class VPNManagerWindow: self.quit_app() return False + def update_vpn_status(self): + """Update location connection status from actual VPN manager.""" + if not self.vpn_manager: + return + + # Only update status for active locations to avoid unnecessary nmcli calls + for customer in self.customers: + for location in customer.locations: + if location.active: # Only check active locations + try: + status = self.vpn_manager.get_connection_status( + location) + location.connected = (status == VPNStatus.CONNECTED) + except VPNConnectionError: + # If we can't get status, assume disconnected + location.connected = False + def quit_app(self, _widget=None): # Stop the tray icon if hasattr(self, 'tray_icon'): diff --git a/models.py b/models.py index 8eed863..29d1551 100644 --- a/models.py +++ b/models.py @@ -40,12 +40,45 @@ class VPNType(Enum): IPSEC = "IPSec" +@dataclass +class NetworkSegment: + """Represents a network segment with metadata.""" + name: str # "LAN", "DMZ", "Management" + cidr: str # "192.168.1.0/24" + vlan_id: Optional[int] = None # VLAN 100 + zone: str = "general" # "production", "dmz", "management", "guest" + gateway: Optional[str] = None # "192.168.1.1" + description: str = "" # "Main office network" + + +@dataclass +class PortForwarding: + """Represents a port forwarding rule for external access.""" + external_port: int # Port on external address (e.g., 8080) + # Target internal IP (e.g., "192.168.1.10") + internal_ip: str + internal_port: int # Target internal port (e.g., 80) + protocol: str = "tcp" # "tcp", "udp", or "both" + description: str = "" # "Web server access" + enabled: bool = True # Whether the forwarding is active + + +@dataclass +class HostIP: + """IP address with network segment context.""" + ip_address: str + network_segment: str # References NetworkSegment.name + is_primary: bool = False # Primary interface for this host + + @dataclass class Host: """Represents a physical or virtual host at a location.""" name: str - ip_address: str - host_type: HostType + ip_addresses: List[HostIP] = field(default_factory=list) + host_type: HostType = HostType.LINUX + # Icon name without extension (e.g., 'ubuntu', 'windows') + icon: Optional[str] = None description: str = "" services: List[Service] = field(default_factory=list) sub_hosts: List['Host'] = field( @@ -62,6 +95,38 @@ class Host: """Check if this host has sub-hosts (VMs).""" return len(self.sub_hosts) > 0 + def get_primary_ip(self) -> str: + """Get the primary IP address, or first IP if no primary set.""" + if not self.ip_addresses: + return "" + + # Look for explicitly marked primary + for host_ip in self.ip_addresses: + if host_ip.is_primary: + return host_ip.ip_address + + # Fall back to first IP + return self.ip_addresses[0].ip_address + + def get_ip_display(self) -> str: + """Get a display string for IP addresses.""" + if not self.ip_addresses: + return "No IP" + elif len(self.ip_addresses) == 1: + return self.ip_addresses[0].ip_address + else: + primary_ip = self.get_primary_ip() + return f"{primary_ip} (+{len(self.ip_addresses)-1} more)" + + def get_all_ips(self) -> List[str]: + """Get all IP addresses as a simple list.""" + return [host_ip.ip_address for host_ip in self.ip_addresses] + + def get_ips_in_segment(self, segment_name: str) -> List[str]: + """Get all IP addresses in a specific network segment.""" + return [host_ip.ip_address for host_ip in self.ip_addresses + if host_ip.network_segment == segment_name] + @dataclass class Location: @@ -72,11 +137,24 @@ class Location: active: bool = False vpn_config: str = "" # Path to VPN config or connection details hosts: List[Host] = field(default_factory=list) - + + # Enhanced network configuration + network_segments: List[NetworkSegment] = field( + default_factory=list) # Network segments with rich metadata + external_addresses: List[str] = field( + default_factory=list) # External VPN endpoints + port_forwardings: List[PortForwarding] = field( + default_factory=list) # Port forwarding rules + + # Legacy field for backward compatibility (will be deprecated) + # Simple network list (legacy) + networks: List[str] = field(default_factory=list) + # VPN connection management fields - nmcli_connection_name: Optional[str] = None # NetworkManager connection name + # NetworkManager connection name + nmcli_connection_name: Optional[str] = None auto_import: bool = True # Auto-import .ovpn file if not in NetworkManager - + # Credential storage - can be: # - Passbolt UUID string (for future use) # - Dict with 'username' and 'password' keys @@ -112,6 +190,109 @@ class Location: """Get all hosts that have sub-hosts (hypervisors).""" return [host for host in self.get_all_hosts_flat() if host.is_hypervisor()] + def get_segment_by_name(self, segment_name: str) -> Optional[NetworkSegment]: + """Get a network segment by its name.""" + return next((seg for seg in self.network_segments if seg.name == segment_name), None) + + def get_hosts_in_segment(self, segment_name: str) -> List[Host]: + """Get all hosts that have IPs in the specified network segment.""" + hosts = [] + for host in self.get_all_hosts_flat(): + if any(host_ip.network_segment == segment_name for host_ip in host.ip_addresses): + hosts.append(host) + return hosts + + def get_segments_by_zone(self, zone: str) -> List[NetworkSegment]: + """Get all network segments in a specific zone.""" + return [seg for seg in self.network_segments if seg.zone == zone] + + def get_port_forwardings_for_host(self, host_ip: str) -> List[PortForwarding]: + """Get all port forwardings targeting a specific host IP.""" + return [pf for pf in self.port_forwardings if pf.internal_ip == host_ip and pf.enabled] + + def get_externally_accessible_services(self) -> List[tuple]: + """Get all services accessible from external addresses via port forwarding. + + Returns list of tuples: (external_address, external_port, host, service, port_forwarding) + """ + accessible_services = [] + + for external_addr in self.external_addresses: + for port_forward in self.port_forwardings: + if not port_forward.enabled: + continue + + # Find the host that owns the target IP + target_host = None + target_service = None + + for host in self.get_all_hosts_flat(): + host_ips = [hip.ip_address for hip in host.ip_addresses] + if port_forward.internal_ip in host_ips: + target_host = host + + # Find matching service on this host + for service in host.services: + if service.port == port_forward.internal_port: + target_service = service + break + break + + if target_host: + accessible_services.append(( + external_addr, + port_forward.external_port, + target_host, + target_service, # May be None if no matching service defined + port_forward + )) + + return accessible_services + + def is_service_externally_accessible(self, host_ip: str, service_port: int) -> bool: + """Check if a specific service is accessible from external addresses.""" + for pf in self.port_forwardings: + if (pf.enabled and + pf.internal_ip == host_ip and + pf.internal_port == service_port): + return True + return False + + def is_service_reachable(self, host: 'Host', service: Service) -> bool: + """Check if a service is reachable (either via VPN connection or port forwarding). + + Returns True if: + - VPN is connected (all internal services become reachable) + - Service has a port forwarding rule enabled + """ + # If VPN is connected, all services are reachable + if self.connected: + return True + + # Check if service is externally accessible via port forwarding + for host_ip in host.ip_addresses: + if self.is_service_externally_accessible(host_ip.ip_address, service.port): + return True + + return False + + def get_external_url_for_service(self, host: 'Host', service: Service) -> Optional[str]: + """Get the external URL for a service if it has port forwarding. + + Returns the external URL (e.g., "https://vpn.example.com:8006") or None. + """ + for host_ip in host.ip_addresses: + for pf in self.port_forwardings: + if (pf.enabled and + pf.internal_ip == host_ip.ip_address and + pf.internal_port == service.port): + # Use first external address if available + if self.external_addresses: + protocol = "https" if service.port in [ + 443, 8006, 8080] else "http" + return f"{protocol}://{self.external_addresses[0]}:{pf.external_port}" + return None + @dataclass class CustomerService: diff --git a/services/__init__.py b/services/__init__.py index 003ea75..befb764 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1,17 +1,8 @@ """Services package for VPN and password management.""" -from .vpn_manager import VPNManager, VPNConnectionError, VPNStatus, VPNConnection -from .passbolt_client import PassboltClient, PassboltError, PassboltCredential -from .connection_manager import ConnectionManager, ConnectionConfig - +from .vpn_manager import VPNManager, VPNConnectionError, VPNStatus __all__ = [ 'VPNManager', - 'VPNConnection', - 'VPNConnectionError', - 'VPNStatus', - 'PassboltClient', - 'PassboltCredential', - 'PassboltError', - 'ConnectionManager', - 'ConnectionConfig', -] \ No newline at end of file + 'VPNConnectionError', + 'VPNStatus' +] diff --git a/services/connection_manager.py b/services/connection_manager.py deleted file mode 100644 index 016e2de..0000000 --- a/services/connection_manager.py +++ /dev/null @@ -1,266 +0,0 @@ -"""High-level connection manager that integrates VPN and Passbolt.""" - -import logging -from typing import Optional, Dict, Any -from pathlib import Path -from dataclasses import dataclass - -from .vpn_manager import VPNManager, VPNStatus, VPNConnectionError -from .passbolt_client import PassboltClient, PassboltError - - -logger = logging.getLogger(__name__) - - -@dataclass -class ConnectionConfig: - """Configuration for a VPN connection.""" - name: str - vpn_config_path: str - nmcli_connection_name: Optional[str] = None - auto_import: bool = True # Auto-import .ovpn if not in NetworkManager - # Credentials can be: - # - Passbolt UUID string (for future implementation) - # - Dict with 'username' and 'password' keys - # - None if no credentials needed - vpn_credentials: Optional[dict | str] = None - - -class ConnectionManager: - """Manages VPN connections with Passbolt credential integration.""" - - def __init__(self, use_passbolt: bool = True): - """Initialize the connection manager. - - Args: - use_passbolt: Whether to use Passbolt for credentials - """ - self.vpn_manager = VPNManager() - self.passbolt_client = None - - if use_passbolt: - try: - self.passbolt_client = PassboltClient() - logger.info("Passbolt client initialized successfully") - except PassboltError as e: - logger.warning(f"Passbolt not available: {e}") - logger.info("Falling back to manual credential entry") - - def connect_location(self, config: ConnectionConfig, - username: Optional[str] = None, - password: Optional[str] = None) -> None: - """Connect to a VPN location. - - Args: - config: Connection configuration - username: Override username (if not using Passbolt) - password: Override password (if not using Passbolt) - """ - # Ensure connection exists in NetworkManager - connection_name = self._ensure_connection(config) - - # Get credentials - check overrides first, then config - if not username or not password: - creds_username, creds_password = self._get_credentials_from_config(config) - username = username or creds_username - password = password or creds_password - - if not username or not password: - logger.info(f"No credentials provided for {connection_name}") - # nmcli will prompt for credentials - - # Connect - try: - logger.info(f"Connecting to {connection_name}") - self.vpn_manager.connect(connection_name, username, password) - logger.info(f"Successfully connected to {connection_name}") - except VPNConnectionError as e: - logger.error(f"Failed to connect to {connection_name}: {e}") - raise - - def disconnect_location(self, config: ConnectionConfig) -> None: - """Disconnect from a VPN location. - - Args: - config: Connection configuration - """ - connection_name = config.nmcli_connection_name or config.name - - if not self.vpn_manager.connection_exists(connection_name): - logger.warning(f"Connection {connection_name} does not exist") - return - - try: - logger.info(f"Disconnecting from {connection_name}") - self.vpn_manager.disconnect(connection_name) - logger.info(f"Successfully disconnected from {connection_name}") - except VPNConnectionError as e: - logger.error(f"Failed to disconnect from {connection_name}: {e}") - raise - - def get_connection_status(self, config: ConnectionConfig) -> VPNStatus: - """Get the status of a VPN connection. - - Args: - config: Connection configuration - - Returns: - Current VPN status - """ - connection_name = config.nmcli_connection_name or config.name - - if not self.vpn_manager.connection_exists(connection_name): - return VPNStatus.DISCONNECTED - - return self.vpn_manager.get_status(connection_name) - - def _ensure_connection(self, config: ConnectionConfig) -> str: - """Ensure VPN connection exists in NetworkManager. - - Args: - config: Connection configuration - - Returns: - Name of the NetworkManager connection - """ - connection_name = config.nmcli_connection_name or config.name - - # Check if connection already exists - if self.vpn_manager.connection_exists(connection_name): - logger.debug(f"Connection {connection_name} already exists") - return connection_name - - # Import if auto_import is enabled and config file exists - if config.auto_import and config.vpn_config_path: - vpn_file = Path(config.vpn_config_path) - if vpn_file.exists(): - logger.info(f"Importing VPN configuration from {vpn_file}") - imported_name = self.vpn_manager.import_ovpn( - str(vpn_file), - connection_name - ) - logger.info(f"Imported connection as {imported_name}") - return imported_name - else: - raise VPNConnectionError( - f"VPN config file not found: {config.vpn_config_path}" - ) - - raise VPNConnectionError( - f"Connection {connection_name} does not exist and auto-import is disabled" - ) - - def _get_credentials_from_config(self, config: ConnectionConfig) -> tuple[Optional[str], Optional[str]]: - """Get credentials from the configuration. - - Args: - config: Connection configuration - - Returns: - Tuple of (username, password) or (None, None) - """ - if not config.vpn_credentials: - return None, None - - # If it's a dict with username/password - if isinstance(config.vpn_credentials, dict): - username = config.vpn_credentials.get('username') - password = config.vpn_credentials.get('password') - return username, password - - # If it's a string (Passbolt UUID for future use) - if isinstance(config.vpn_credentials, str): - # For now, try to use Passbolt if available - if self.passbolt_client: - try: - return self._get_passbolt_credentials(config.vpn_credentials) - except (PassboltError, ValueError) as e: - logger.warning(f"Failed to get Passbolt credentials: {e}") - else: - logger.warning(f"Passbolt UUID provided but Passbolt client not available") - - return None, None - - def _get_passbolt_credentials(self, resource_id: str) -> tuple[str, str]: - """Get credentials from Passbolt. - - Args: - resource_id: Passbolt resource UUID - - Returns: - Tuple of (username, password) - """ - if not self.passbolt_client: - raise ValueError("Passbolt client not initialized") - - try: - credential = self.passbolt_client.get_credential(resource_id) - - if not credential.username or not credential.password: - raise ValueError( - f"Incomplete credentials for resource {resource_id}") - - return credential.username, credential.password - - except PassboltError as e: - logger.error(f"Failed to get Passbolt credentials: {e}") - raise - - def validate_passbolt_resource(self, resource_id: str) -> bool: - """Validate that a Passbolt resource exists and has required fields. - - Args: - resource_id: Passbolt resource UUID - - Returns: - True if resource is valid for VPN use - """ - if not self.passbolt_client: - return False - - try: - credential = self.passbolt_client.get_credential(resource_id) - return bool(credential.username and credential.password) - except PassboltError: - return False - - def import_all_configs(self, configs: list[ConnectionConfig]) -> Dict[str, bool]: - """Import multiple VPN configurations. - - Args: - configs: List of connection configurations - - Returns: - Dictionary mapping connection names to success status - """ - results = {} - - for config in configs: - try: - connection_name = self._ensure_connection(config) - results[connection_name] = True - logger.info(f"Successfully imported {connection_name}") - except VPNConnectionError as e: - results[config.name] = False - logger.error(f"Failed to import {config.name}: {e}") - - return results - - def cleanup_connection(self, config: ConnectionConfig, - remove_from_nm: bool = False) -> None: - """Clean up a VPN connection. - - Args: - config: Connection configuration - remove_from_nm: Whether to remove from NetworkManager - """ - connection_name = config.nmcli_connection_name or config.name - - # Disconnect if connected - if self.get_connection_status(config) == VPNStatus.CONNECTED: - self.disconnect_location(config) - - # Remove from NetworkManager if requested - if remove_from_nm and self.vpn_manager.connection_exists(connection_name): - logger.info(f"Removing {connection_name} from NetworkManager") - self.vpn_manager.delete_connection(connection_name) diff --git a/services/passbolt_client.py b/services/passbolt_client.py deleted file mode 100644 index 3b948b4..0000000 --- a/services/passbolt_client.py +++ /dev/null @@ -1,369 +0,0 @@ -"""Passbolt CLI integration for secure credential management.""" - -import subprocess -import json -import os -from dataclasses import dataclass -from typing import Optional, List, Dict, Any -from enum import Enum -from pathlib import Path - - -class PassboltResourceType(Enum): - """Types of resources in Passbolt.""" - PASSWORD = "password" - PASSWORD_WITH_DESCRIPTION = "password-with-description" - PASSWORD_STRING = "password-string" - TOTP = "totp" - - -class PassboltError(Exception): - """Exception raised for Passbolt operations.""" - pass - - -@dataclass -class PassboltCredential: - """Represents credentials retrieved from Passbolt.""" - resource_id: str - name: str - username: Optional[str] = None - password: Optional[str] = None - uri: Optional[str] = None - description: Optional[str] = None - resource_type: PassboltResourceType = PassboltResourceType.PASSWORD - - -@dataclass -class PassboltResource: - """Represents a Passbolt resource.""" - id: str - name: str - username: Optional[str] = None - uri: Optional[str] = None - resource_type: str = "password" - folder_parent_id: Optional[str] = None - personal: bool = False - - -class PassboltClient: - """Client for interacting with Passbolt through the CLI.""" - - def __init__(self, passbolt_cli_path: str = "passbolt"): - """Initialize Passbolt client. - - Args: - passbolt_cli_path: Path to the passbolt CLI executable - """ - self.cli_path = passbolt_cli_path - self._check_cli_available() - self._check_authentication() - - def _check_cli_available(self) -> None: - """Check if Passbolt CLI is available.""" - try: - subprocess.run([self.cli_path, '--version'], - capture_output=True, check=True) - except (subprocess.CalledProcessError, FileNotFoundError): - raise PassboltError( - f"Passbolt CLI not found at '{self.cli_path}'. " - "Please install: https://github.com/passbolt/go-passbolt-cli" - ) - - def _check_authentication(self) -> None: - """Check if authenticated with Passbolt.""" - try: - # Try to list resources to check auth - self._run_passbolt(['list', '--json'], check=True) - except PassboltError: - raise PassboltError( - "Not authenticated with Passbolt. " - "Please run: passbolt auth login" - ) - - def _run_passbolt(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess: - """Run a Passbolt CLI command. - - Args: - args: Command arguments - check: Whether to check return code - - Returns: - Completed process result - """ - try: - result = subprocess.run( - [self.cli_path] + args, - capture_output=True, - text=True, - check=check - ) - return result - except subprocess.CalledProcessError as e: - raise PassboltError(f"Passbolt command failed: {e.stderr}") - - def get_credential(self, resource_id: str) -> PassboltCredential: - """Get a credential by resource ID. - - Args: - resource_id: UUID of the Passbolt resource - - Returns: - PassboltCredential object with username and password - """ - # Get the full resource - result = self._run_passbolt(['get', '--id', resource_id, '--json']) - - try: - data = json.loads(result.stdout) - except json.JSONDecodeError: - raise PassboltError(f"Failed to parse Passbolt response") - - # Extract fields based on resource type - credential = PassboltCredential( - resource_id=resource_id, - name=data.get('name', ''), - username=data.get('username'), - password=data.get('password'), - uri=data.get('uri'), - description=data.get('description') - ) - - # Determine resource type - if 'resource_type' in data: - try: - credential.resource_type = PassboltResourceType( - data['resource_type']) - except ValueError: - pass # Keep default - - return credential - - def get_field(self, resource_id: str, field: str) -> str: - """Get a specific field from a resource. - - Args: - resource_id: UUID of the Passbolt resource - field: Field name (e.g., 'password', 'username', 'uri') - - Returns: - Field value as string - """ - result = self._run_passbolt( - ['get', '--id', resource_id, '--field', field]) - return result.stdout.strip() - - def get_password(self, resource_id: str) -> str: - """Get just the password for a resource. - - Args: - resource_id: UUID of the Passbolt resource - - Returns: - Password string - """ - return self.get_field(resource_id, 'password') - - def get_username(self, resource_id: str) -> str: - """Get just the username for a resource. - - Args: - resource_id: UUID of the Passbolt resource - - Returns: - Username string - """ - return self.get_field(resource_id, 'username') - - def list_resources(self, folder_id: Optional[str] = None, - search: Optional[str] = None) -> List[PassboltResource]: - """List available resources. - - Args: - folder_id: Optional folder ID to filter by - search: Optional search term - - Returns: - List of PassboltResource objects - """ - args = ['list', '--json'] - - if folder_id: - args.extend(['--folder', folder_id]) - if search: - args.extend(['--filter', search]) - - result = self._run_passbolt(args) - - try: - data = json.loads(result.stdout) - except json.JSONDecodeError: - return [] - - resources = [] - for item in data: - resources.append(PassboltResource( - id=item['id'], - name=item.get('name', ''), - username=item.get('username'), - uri=item.get('uri'), - resource_type=item.get('resource_type', 'password'), - folder_parent_id=item.get('folder_parent_id'), - personal=item.get('personal', False) - )) - - return resources - - def find_resource_by_name(self, name: str) -> Optional[PassboltResource]: - """Find a resource by name. - - Args: - name: Name of the resource to find - - Returns: - First matching PassboltResource or None - """ - resources = self.list_resources(search=name) - for resource in resources: - if resource.name == name: - return resource - return None - - def create_resource(self, name: str, username: str, password: str, - uri: Optional[str] = None, - description: Optional[str] = None, - folder_id: Optional[str] = None) -> str: - """Create a new password resource. - - Args: - name: Resource name - username: Username - password: Password - uri: Optional URI/URL - description: Optional description - folder_id: Optional folder to place resource in - - Returns: - ID of created resource - """ - args = ['create', 'resource', - '--name', name, - '--username', username, - '--password', password] - - if uri: - args.extend(['--uri', uri]) - if description: - args.extend(['--description', description]) - if folder_id: - args.extend(['--folder', folder_id]) - - result = self._run_passbolt(args) - - # Parse the ID from output - # Output format: "Resource created: " - for line in result.stdout.split('\n'): - if 'created' in line.lower() and ':' in line: - parts = line.split(':', 1) - if len(parts) == 2: - return parts[1].strip() - - raise PassboltError("Failed to parse created resource ID") - - def update_resource(self, resource_id: str, - name: Optional[str] = None, - username: Optional[str] = None, - password: Optional[str] = None, - uri: Optional[str] = None, - description: Optional[str] = None) -> None: - """Update an existing resource. - - Args: - resource_id: ID of resource to update - name: New name (optional) - username: New username (optional) - password: New password (optional) - uri: New URI (optional) - description: New description (optional) - """ - args = ['update', 'resource', '--id', resource_id] - - if name: - args.extend(['--name', name]) - if username: - args.extend(['--username', username]) - if password: - args.extend(['--password', password]) - if uri: - args.extend(['--uri', uri]) - if description: - args.extend(['--description', description]) - - self._run_passbolt(args) - - def delete_resource(self, resource_id: str) -> None: - """Delete a resource. - - Args: - resource_id: ID of resource to delete - """ - self._run_passbolt(['delete', 'resource', '--id', resource_id]) - - def share_resource(self, resource_id: str, user_id: str, - permission: str = "read") -> None: - """Share a resource with another user. - - Args: - resource_id: ID of resource to share - user_id: ID of user to share with - permission: Permission level ('read', 'update', 'owner') - """ - self._run_passbolt([ - 'share', 'resource', - '--id', resource_id, - '--user', user_id, - '--permission', permission - ]) - - def list_folders(self) -> List[Dict[str, Any]]: - """List all folders. - - Returns: - List of folder dictionaries - """ - result = self._run_passbolt(['list', 'folder', '--json']) - - try: - return json.loads(result.stdout) - except json.JSONDecodeError: - return [] - - def get_folder_by_name(self, name: str) -> Optional[Dict[str, Any]]: - """Find a folder by name. - - Args: - name: Folder name to search for - - Returns: - Folder dictionary or None - """ - folders = self.list_folders() - for folder in folders: - if folder.get('name') == name: - return folder - return None - - def validate_resource_id(self, resource_id: str) -> bool: - """Check if a resource ID exists and is accessible. - - Args: - resource_id: UUID of the resource - - Returns: - True if resource exists and is accessible - """ - try: - self._run_passbolt(['get', '--id', resource_id, '--field', 'name']) - return True - except PassboltError: - return False diff --git a/services/vpn_manager.py b/services/vpn_manager.py index fbd38f8..1bc3f74 100644 --- a/services/vpn_manager.py +++ b/services/vpn_manager.py @@ -1,13 +1,13 @@ -"""VPN connection management using NetworkManager (nmcli).""" +"""Enhanced VPN management with VPNTray naming and route control.""" import subprocess -import tempfile -import os import re +import logging from dataclasses import dataclass -from typing import Optional, Dict, List +from typing import Optional, List from enum import Enum from pathlib import Path +from models import Location class VPNStatus(Enum): @@ -26,286 +26,482 @@ class VPNConnectionError(Exception): @dataclass -class VPNConnection: - """Represents a NetworkManager VPN connection.""" +class VPNConnectionInfo: + """Information about a VPN connection.""" name: str uuid: str - type: str + vpntray_name: str # Our custom name with vpntray_ prefix + status: VPNStatus device: Optional[str] = None - state: VPNStatus = VPNStatus.UNKNOWN - vpn_type: Optional[str] = None # OpenVPN, WireGuard, etc. + routes: List[str] = None # List of routes added class VPNManager: - """Manages VPN connections through NetworkManager CLI (nmcli).""" + """Enhanced VPN manager with VPNTray naming and route management.""" + + VPNTRAY_PREFIX = "vpntray_" + VPN_CONFIG_DIR = Path.home() / ".vpntray" / "vpn" def __init__(self): - """Initialize VPN manager and check for nmcli availability.""" + """Initialize VPN manager.""" + self.logger = logging.getLogger(__name__) self._check_nmcli_available() + self._ensure_vpn_config_dir() def _check_nmcli_available(self) -> None: - """Check if nmcli is available on the system.""" + """Check if nmcli is available.""" try: subprocess.run(['nmcli', '--version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): raise VPNConnectionError( - "nmcli is not available. Please install NetworkManager.") + "nmcli is not available. Install NetworkManager.") + + def _ensure_vpn_config_dir(self) -> None: + """Ensure VPN config directory exists.""" + self.VPN_CONFIG_DIR.mkdir(parents=True, exist_ok=True) + + def _run_nmcli(self, args: List[str], check: bool = True, timeout: int = 30) -> subprocess.CompletedProcess: + """Run nmcli command with logging and timeout.""" + command = ['nmcli'] + args + command_str = ' '.join(command) - def _run_nmcli(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess: - """Run an nmcli command with error handling.""" try: result = subprocess.run( - ['nmcli'] + args, + command, capture_output=True, text=True, - check=check + check=check, + timeout=timeout # Add timeout to prevent hanging ) + + self.logger.debug(f"Command: {command_str}") + if result.stdout.strip(): + self.logger.debug(f"Output: {result.stdout.strip()}") + if result.stderr.strip(): + self.logger.warning(f"Stderr: {result.stderr.strip()}") + if result.returncode == 0: + self.logger.debug("Command completed successfully") + else: + self.logger.error( + f"Command exited with code: {result.returncode}") + return result + except subprocess.TimeoutExpired: + self.logger.error( + f"Command timed out after {timeout}s: {command_str}") + raise VPNConnectionError( + f"nmcli command timed out after {timeout} seconds") except subprocess.CalledProcessError as e: - raise VPNConnectionError(f"nmcli command failed: {e.stderr}") - - def import_ovpn(self, ovpn_path: str, connection_name: Optional[str] = None) -> str: - """Import an OpenVPN configuration file. - - Args: - ovpn_path: Path to the .ovpn configuration file - connection_name: Optional custom name for the connection - - Returns: - The name of the imported connection - """ - ovpn_file = Path(ovpn_path) - if not ovpn_file.exists(): + self.logger.debug(f"Failed command: {command_str}") + if e.stdout and e.stdout.strip(): + self.logger.debug(f"Output: {e.stdout.strip()}") + if e.stderr and e.stderr.strip(): + self.logger.error(f"Error: {e.stderr.strip()}") + error_details = e.stderr or str(e) raise VPNConnectionError( - f"OpenVPN config file not found: {ovpn_path}") + f"nmcli command failed (exit code {e.returncode}): {error_details}") - # Import the configuration - result = self._run_nmcli([ - 'connection', 'import', 'type', 'openvpn', 'file', str(ovpn_file) - ]) + def _get_vpntray_connection_name(self, config_filename: str) -> str: + """Generate VPNTray-specific connection name.""" + # Remove extension and sanitize + base_name = Path(config_filename).stem + sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', base_name) + return f"{self.VPNTRAY_PREFIX}{sanitized}" - # Extract connection name from output - # nmcli typically outputs: "Connection 'name' (uuid) successfully added." - match = re.search(r"Connection '([^']+)'", result.stdout) - if not match: - raise VPNConnectionError( - "Failed to parse imported connection name") - - imported_name = match.group(1) - - # Rename if custom name provided - if connection_name and connection_name != imported_name: - self._run_nmcli([ - 'connection', 'modify', imported_name, - 'connection.id', connection_name - ]) - return connection_name - - return imported_name - - def connect(self, connection_name: str, - username: Optional[str] = None, - password: Optional[str] = None) -> None: - """Connect to a VPN. - - Args: - connection_name: Name of the NetworkManager connection - username: Optional username for authentication - password: Optional password for authentication - """ - if username and password: - # Create temporary secrets file - with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: - f.write(f"vpn.secrets.password:{password}\n") - if username: - f.write(f"vpn.data.username:{username}\n") - secrets_file = f.name - - try: - self._run_nmcli([ - 'connection', 'up', connection_name, - 'passwd-file', secrets_file - ]) - finally: - # Always clean up secrets file - os.unlink(secrets_file) - else: - # Connect without credentials (will prompt if needed) - self._run_nmcli(['connection', 'up', connection_name]) - - def disconnect(self, connection_name: str) -> None: - """Disconnect from a VPN. - - Args: - connection_name: Name of the NetworkManager connection - """ - self._run_nmcli(['connection', 'down', connection_name]) - - def get_status(self, connection_name: str) -> VPNStatus: - """Get the status of a VPN connection. - - Args: - connection_name: Name of the NetworkManager connection - - Returns: - Current status of the VPN connection - """ - result = self._run_nmcli( - ['connection', 'show', '--active'], - check=False - ) - - if connection_name in result.stdout: - # Parse the actual state - state_result = self._run_nmcli( - ['connection', 'show', connection_name], - check=False - ) - - if 'GENERAL.STATE:' in state_result.stdout: - if 'activated' in state_result.stdout: - return VPNStatus.CONNECTED - elif 'activating' in state_result.stdout: - return VPNStatus.CONNECTING - elif 'deactivating' in state_result.stdout: - return VPNStatus.DISCONNECTING - - return VPNStatus.DISCONNECTED - - def list_connections(self, vpn_only: bool = True) -> List[VPNConnection]: - """List all NetworkManager connections. - - Args: - vpn_only: If True, only return VPN connections - - Returns: - List of VPNConnection objects - """ - args = ['connection', 'show'] - if vpn_only: - args.extend(['--type', 'vpn']) - - result = self._run_nmcli(args, check=False) + def get_vpn_config_path(self, filename: str) -> Path: + """Get full path to VPN config file.""" + return self.VPN_CONFIG_DIR / filename + def list_vpntray_connections(self) -> List[VPNConnectionInfo]: + """List all VPNTray-managed connections.""" connections = [] - for line in result.stdout.strip().split('\n')[1:]: # Skip header - if not line: - continue - parts = line.split() - if len(parts) >= 4: - name = parts[0] - uuid = parts[1] - conn_type = parts[2] - device = parts[3] if parts[3] != '--' else None + try: + result = self._run_nmcli(['connection', 'show']) + for line in result.stdout.strip().split('\\n'): + if self.VPNTRAY_PREFIX in line: + parts = line.split() + if len(parts) >= 4: + name = parts[0] + uuid = parts[1] + device = parts[3] if parts[3] != '--' else None - # Get current status - status = self.get_status(name) + # Get detailed status + status = self._get_connection_status(name) - connections.append(VPNConnection( - name=name, - uuid=uuid, - type=conn_type, - device=device, - state=status - )) + connections.append(VPNConnectionInfo( + name=name, + uuid=uuid, + vpntray_name=name, + status=status, + device=device + )) + except VPNConnectionError: + pass # No connections or nmcli error return connections - def delete_connection(self, connection_name: str) -> None: - """Delete a NetworkManager connection. + def _get_connection_status(self, connection_name: str) -> VPNStatus: + """Get the status of a specific connection.""" + try: + result = self._run_nmcli(['connection', 'show', connection_name]) - Args: - connection_name: Name of the connection to delete - """ - self._run_nmcli(['connection', 'delete', connection_name]) + # Parse connection state from output + for line in result.stdout.split('\\n'): + if 'GENERAL.STATE:' in line: + state = line.split(':')[1].strip() + if 'activated' in state.lower(): + return VPNStatus.CONNECTED + elif 'activating' in state.lower(): + return VPNStatus.CONNECTING + elif 'deactivating' in state.lower(): + return VPNStatus.DISCONNECTING + else: + return VPNStatus.DISCONNECTED + except VPNConnectionError: + pass - def connection_exists(self, connection_name: str) -> bool: - """Check if a connection exists. + return VPNStatus.UNKNOWN - Args: - connection_name: Name of the connection to check + def import_vpn_config(self, location: Location) -> str: + """Import VPN configuration for a location with VPNTray naming.""" + config_path = self.get_vpn_config_path(location.vpn_config) - Returns: - True if the connection exists - """ - result = self._run_nmcli( - ['connection', 'show', connection_name], - check=False - ) - return result.returncode == 0 + if not config_path.exists(): + raise VPNConnectionError(f"VPN config not found: {config_path}") - def modify_connection(self, connection_name: str, - settings: Dict[str, str]) -> None: - """Modify connection settings. + self.logger.info( + f"Config file exists: {config_path} ({config_path.stat().st_size} bytes)") - Args: - connection_name: Name of the connection to modify - settings: Dictionary of setting key-value pairs - e.g., {'vpn.data.comp-lzo': 'yes'} - """ - for key, value in settings.items(): + vpntray_name = self._get_vpntray_connection_name(location.vpn_config) + + # Check if already imported + if self._get_connection_by_name(vpntray_name): + self.logger.info(f"Connection already imported: {vpntray_name}") + return vpntray_name + + # Import based on VPN type + self.logger.info( + f"Importing {location.vpn_type.value} config: {config_path.name}") + + if location.vpn_type.value == "OpenVPN": + return self._import_openvpn(config_path, vpntray_name, location) + elif location.vpn_type.value == "WireGuard": + return self._import_wireguard(config_path, vpntray_name, location) + else: + raise VPNConnectionError( + f"Unsupported VPN type: {location.vpn_type.value}") + + def _import_openvpn(self, config_path: Path, vpntray_name: str, location: Location) -> str: + """Import OpenVPN configuration with route control.""" + # Import the config file first (nmcli will auto-generate a name) + import_args = [ + 'connection', 'import', 'type', 'openvpn', + 'file', str(config_path) + ] + self.logger.info(f"Running nmcli import: {' '.join(import_args)}") + + try: + result = self._run_nmcli(import_args) + + # Extract the auto-generated connection name from the output + # nmcli outputs: "Connection 'name' (uuid) successfully added." + import re + match = re.search(r"Connection '([^']+)'", result.stdout) + if not match: + raise VPNConnectionError( + "Failed to parse imported connection name from nmcli output") + + auto_generated_name = match.group(1) + self.logger.info( + f"Config imported with auto name: {auto_generated_name}") + + # Rename to our VPNTray naming convention + rename_args = [ + 'connection', 'modify', auto_generated_name, + 'connection.id', vpntray_name + ] + self.logger.info(f"Renaming to: {vpntray_name}") + + self._run_nmcli(rename_args) + self.logger.info( + f"OpenVPN config imported as {vpntray_name}") + + except VPNConnectionError as e: + self.logger.error(f"OpenVPN import failed: {e}") + raise + + # Configure credentials immediately after import if provided + if location.vpn_credentials: + self._configure_credentials(vpntray_name, location) + + # Configure the connection to not route everything by default + self._configure_connection_routes(vpntray_name, location) + + return vpntray_name + + def _import_wireguard(self, config_path: Path, vpntray_name: str, location: Location) -> str: + """Import WireGuard configuration with route control.""" + # Import the config file first (nmcli will auto-generate a name) + import_args = [ + 'connection', 'import', 'type', 'wireguard', + 'file', str(config_path) + ] + self.logger.info( + f"Running nmcli import: {' '.join(import_args)}") + + try: + result = self._run_nmcli(import_args) + + # Extract the auto-generated connection name from the output + # nmcli outputs: "Connection 'name' (uuid) successfully added." + import re + match = re.search(r"Connection '([^']+)'", result.stdout) + if not match: + raise VPNConnectionError( + "Failed to parse imported connection name from nmcli output") + + auto_generated_name = match.group(1) + self.logger.info( + f"Config imported with auto name: {auto_generated_name}") + + # Rename to our VPNTray naming convention + rename_args = [ + 'connection', 'modify', auto_generated_name, + 'connection.id', vpntray_name + ] + self.logger.info(f"Renaming to: {vpntray_name}") + + self._run_nmcli(rename_args) + self.logger.info( + f"WireGuard config imported as {vpntray_name}") + + except VPNConnectionError as e: + self.logger.error(f"WireGuard import failed: {e}") + raise + + # Configure credentials immediately after import if provided + if location.vpn_credentials: + self._configure_credentials(vpntray_name, location) + + # Configure routes + self._configure_connection_routes(vpntray_name, location) + + return vpntray_name + + def _configure_connection_routes(self, connection_name: str, location: Location) -> None: + """Configure connection to only route specified network segments.""" + try: + # Disable automatic default route self._run_nmcli([ 'connection', 'modify', connection_name, - key, value + 'ipv4.never-default', 'true' ]) - def get_connection_details(self, connection_name: str) -> Dict[str, str]: - """Get detailed information about a connection. + # Add routes for each network segment + routes = [] + for segment in location.network_segments: + # Add route for the network segment + routes.append(segment.cidr) - Args: - connection_name: Name of the connection + if routes: + routes_str = ','.join(routes) + self._run_nmcli([ + 'connection', 'modify', connection_name, + 'ipv4.routes', routes_str + ]) + self.logger.info( + f"Configured routes for {connection_name}: {routes_str}") - Returns: - Dictionary of connection properties - """ - result = self._run_nmcli(['connection', 'show', connection_name]) + except VPNConnectionError as e: + self.logger.error(f"Failed to configure routes: {e}") + # Don't fail the import, just log the error - details = {} - for line in result.stdout.strip().split('\n'): - if ':' in line: - key, value = line.split(':', 1) - details[key.strip()] = value.strip() - - return details - - def get_active_vpn_interface(self, connection_name: str) -> Optional[str]: - """Get the network interface used by an active VPN connection. - - Args: - connection_name: Name of the VPN connection - - Returns: - Interface name (e.g., 'tun0') or None if not connected - """ - if self.get_status(connection_name) != VPNStatus.CONNECTED: + def _get_connection_by_name(self, name: str) -> Optional[VPNConnectionInfo]: + """Get connection info by name.""" + try: + # Check if connection exists (simple and fast) + result = self._run_nmcli(['connection', 'show', name], check=False) + if result.returncode == 0: + # Connection exists, create minimal info object + return VPNConnectionInfo( + name=name, + uuid="unknown", + vpntray_name=name, + status=VPNStatus.UNKNOWN # Status will be checked when needed + ) + return None + except VPNConnectionError: return None - details = self.get_connection_details(connection_name) - return details.get('GENERAL.DEVICES') + def connect_vpn(self, location: Location) -> bool: + """Connect to VPN for a location.""" + try: + vpntray_name = self._get_vpntray_connection_name( + location.vpn_config) + config_path = self.get_vpn_config_path(location.vpn_config) + self.logger.info(f"VPN config: {config_path}") + self.logger.info(f"Connection name: {vpntray_name}") - def get_vpn_ip_address(self, connection_name: str) -> Optional[str]: - """Get the IP address assigned to the VPN connection. + # Check if config file exists + if not config_path.exists(): + error_msg = f"VPN config file not found: {config_path}" + self.logger.error(error_msg) + return False - Args: - connection_name: Name of the VPN connection + # Import if not already imported + existing_conn = self._get_connection_by_name(vpntray_name) + if not existing_conn: + self.logger.info( + "Importing VPN config for first time...") + try: + self.import_vpn_config(location) + self.logger.info( + "VPN config imported successfully") + except Exception as import_error: + error_msg = f"Failed to import VPN config: {import_error}" + self.logger.error(error_msg) + return False + else: + self.logger.info( + f"Using existing connection: {existing_conn.status.value}") - Returns: - IP address or None if not connected - """ - interface = self.get_active_vpn_interface(connection_name) - if not interface: - return None + # Connect with simple command - credentials already set during import + self.logger.info("Attempting connection...") - result = self._run_nmcli(['device', 'show', interface], check=False) + # Simple connection command without credential complications + connect_args = ['connection', 'up', vpntray_name] + self._run_nmcli(connect_args, timeout=60) + self.logger.info(f"Connected to {vpntray_name}") - for line in result.stdout.split('\n'): - if 'IP4.ADDRESS' in line and 'IP4.ADDRESS[2]' not in line: - # Format is usually "IP4.ADDRESS[1]: 10.0.0.1/24" - if ':' in line: - addr_part = line.split(':', 1)[1].strip() - if '/' in addr_part: - return addr_part.split('/')[0] + return True - return None + except VPNConnectionError as e: + self.logger.error(f"VPN connection failed: {e}") + return False + except Exception as e: + self.logger.error( + f"Unexpected error during connection: {e}") + return False + + def disconnect_vpn(self, location: Location) -> bool: + """Disconnect VPN for a location.""" + try: + vpntray_name = self._get_vpntray_connection_name( + location.vpn_config) + self.logger.info(f"Disconnecting from {vpntray_name}...") + + # Check if connection exists + existing_conn = self._get_connection_by_name(vpntray_name) + if not existing_conn: + self.logger.error( + f"Connection {vpntray_name} not found") + return False + + # Disconnect + self._run_nmcli(['connection', 'down', vpntray_name]) + self.logger.info(f"Disconnected from {vpntray_name}") + + return True + + except VPNConnectionError as e: + self.logger.error(f"Failed to disconnect: {e}") + return False + except Exception as e: + self.logger.error( + f"Unexpected error during disconnection: {e}") + return False + + def get_connection_status(self, location: Location) -> VPNStatus: + """Get connection status for a location.""" + vpntray_name = self._get_vpntray_connection_name(location.vpn_config) + return self._get_connection_status(vpntray_name) + + def remove_vpn_config(self, location: Location) -> bool: + """Remove VPN connection configuration.""" + try: + vpntray_name = self._get_vpntray_connection_name( + location.vpn_config) + + # First disconnect if connected + try: + self._run_nmcli( + ['connection', 'down', vpntray_name], check=False) + except VPNConnectionError: + pass # Ignore if already disconnected + + # Remove the connection + self._run_nmcli(['connection', 'delete', vpntray_name]) + self.logger.info( + f"Removed VPN configuration {vpntray_name}") + + return True + + except VPNConnectionError as e: + self.logger.error(f"Failed to remove config: {e}") + return False + + def cleanup_vpntray_connections(self) -> int: + """Remove all VPNTray-managed connections. Returns count removed.""" + connections = self.list_vpntray_connections() + removed_count = 0 + + for conn in connections: + try: + # Disconnect first + self._run_nmcli(['connection', 'down', conn.name], check=False) + # Remove + self._run_nmcli(['connection', 'delete', conn.name]) + removed_count += 1 + except VPNConnectionError: + pass # Continue with other connections + + if self.logger and removed_count > 0: + self.logger.info( + f"Cleaned up {removed_count} VPNTray connections") + + return removed_count + + def _configure_credentials(self, connection_name: str, location: Location) -> None: + """Configure VPN credentials directly in the connection.""" + if not location.vpn_credentials: + self.logger.info( + f"No credentials provided for {connection_name}") + return + + try: + # Handle dictionary credentials (username/password) + if isinstance(location.vpn_credentials, dict): + username = location.vpn_credentials.get('username') + password = location.vpn_credentials.get('password') + self.logger.info( + f"Setting credentials for {connection_name}...") + + # Set username and password with correct nmcli syntax + if username: + self._run_nmcli([ + 'connection', 'modify', connection_name, + '+vpn.data', f'username={username}' + ]) + self.logger.info( + f"Username configured for {connection_name}") + + if password: + self._run_nmcli([ + 'connection', 'modify', connection_name, + '+vpn.secrets', f'password={password}' + ]) + self.logger.info( + f"Password configured for {connection_name}") + + if username and password: + self.logger.info( + f"Full credentials configured for {connection_name}") + elif username or password: + self.logger.info( + f"Partial credentials configured for {connection_name}") + + except VPNConnectionError as e: + self.logger.error(f"Failed to configure credentials: {e}") + # Don't fail the whole operation for credential issues diff --git a/style.css b/style.css new file mode 100644 index 0000000..71180be --- /dev/null +++ b/style.css @@ -0,0 +1,56 @@ +.card { + background: @theme_base_color; + border-radius: 8px; + border: 1px solid @borders; + box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1); + padding: 16px; + margin: 6px; +} + +.location-info { + background: linear-gradient(to bottom, alpha(@theme_selected_bg_color, 0.1), alpha(@theme_selected_bg_color, 0.05)); + border-radius: 8px; + border: 1px solid alpha(@theme_selected_bg_color, 0.3); + padding: 12px; + margin-bottom: 12px; +} + +.log-section { + background: @theme_base_color; + border-top: 1px solid @borders; + border-radius: 8px 8px 0 0; +} + +/* Material Icons font */ +.material-icons { + font-family: "Material Icons"; + font-weight: normal; + font-style: normal; + font-size: 18px; +} + +/* Service button color coding with Material Icons */ +.service-icon-accessible { + font-family: "Material Icons"; + color: #4caf50; + /* Green for accessible */ + font-size: 18px; +} + +.service-icon-inaccessible { + font-family: "Material Icons"; + color: #f44336; + /* Red for not accessible */ + font-size: 18px; +} + +.service-icon-accessible:hover { + color: #2e7d32; + /* Darker green on hover */ +} + +.service-icon-inaccessible:disabled { + color: #ef9a9a; + /* Lighter red when disabled */ + opacity: 0.6; +} \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..38185a9 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,5 @@ +"""Utility modules for VPN Manager.""" + +from .icon_loader import IconLoader + +__all__ = ['IconLoader'] \ No newline at end of file diff --git a/utils/icon_loader.py b/utils/icon_loader.py new file mode 100644 index 0000000..b928a5a --- /dev/null +++ b/utils/icon_loader.py @@ -0,0 +1,142 @@ +"""Icon loader utility for host icons with fallback support.""" + +import os +from pathlib import Path +import gi +gi.require_version('Gtk', '3.0') +from gi.repository import Gtk, GdkPixbuf +from models import HostType + + +class IconLoader: + """Manages loading of host icons with fallback to Material Icons.""" + + # Default icon size + ICON_SIZE = 20 + + # Project root directory + PROJECT_ROOT = Path(__file__).parent.parent + ICONS_DIR = PROJECT_ROOT / "assets" / "icons" + + # Material Icons fallback mapping for host types + HOST_TYPE_ICONS = { + HostType.LINUX: "computer", + HostType.WINDOWS: "desktop_windows", + HostType.WINDOWS_SERVER: "dns", + HostType.PROXMOX: "developer_board", + HostType.ESXI: "developer_board", + HostType.ROUTER: "router", + HostType.SWITCH: "device_hub" + } + + @classmethod + def get_host_icon_widget(cls, host, size=None) -> Gtk.Widget: + """Get an icon widget for a host, either custom SVG or Material Icon fallback. + + Args: + host: Host object with optional icon field + size: Icon size in pixels (default: ICON_SIZE) + + Returns: + Gtk.Image if custom icon exists, Gtk.Label with Material Icon otherwise + """ + if size is None: + size = cls.ICON_SIZE + + # Try custom icon first + if host.icon: + icon_widget = cls._load_custom_icon(host.icon, size) + if icon_widget: + return icon_widget + + # Fallback to Material Icons based on host type + return cls._create_material_icon(host.host_type, size) + + @classmethod + def _load_custom_icon(cls, icon_name: str, size: int) -> Gtk.Image: + """Load a custom SVG icon from assets/icons directory. + + Args: + icon_name: Name of the icon file without extension (e.g., 'ubuntu') + size: Icon size in pixels + + Returns: + Gtk.Image if icon exists, None otherwise + """ + # Try SVG first, then PNG + for extension in ['.svg', '.png']: + icon_path = cls.ICONS_DIR / f"{icon_name}{extension}" + + if icon_path.exists(): + try: + # Load and scale the icon + pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size( + str(icon_path), size, size + ) + image = Gtk.Image.new_from_pixbuf(pixbuf) + return image + except Exception as e: + print(f"Failed to load icon {icon_path}: {e}") + + return None + + @classmethod + def _create_material_icon(cls, host_type: HostType, size: int) -> Gtk.Label: + """Create a Material Icon label for a host type. + + Args: + host_type: HostType enum value + size: Icon size in pixels + + Returns: + Gtk.Label with Material Icon + """ + icon_name = cls.HOST_TYPE_ICONS.get(host_type, "computer") + + label = Gtk.Label() + label.set_text(icon_name) + label.get_style_context().add_class("material-icons") + + # Apply custom CSS for size + css = f""" + #{label.get_name()} {{ + font-size: {size}px; + }} + """ + + return label + + @classmethod + def get_service_icon(cls, service_type: str, is_accessible: bool) -> Gtk.Label: + """Get a Material Icon for a service with color coding. + + Args: + service_type: Service type string (e.g., 'SSH', 'Web GUI') + is_accessible: Whether the service is currently accessible + + Returns: + Gtk.Label with colored Material Icon + """ + # Service type to Material Icons mapping + service_icons = { + 'SSH': 'terminal', + 'Web GUI': 'language', + 'RDP': 'desktop_windows', + 'VNC': 'monitor', + 'SMB': 'folder_shared', + 'Database': 'storage', + 'FTP': 'cloud_upload' + } + + icon_name = service_icons.get(service_type, 'settings') + + label = Gtk.Label() + label.set_text(icon_name) + + # Apply color based on accessibility + if is_accessible: + label.get_style_context().add_class("service-icon-accessible") + else: + label.get_style_context().add_class("service-icon-inaccessible") + + return label \ No newline at end of file diff --git a/views/__init__.py b/views/__init__.py index 30d1617..502e8be 100644 --- a/views/__init__.py +++ b/views/__init__.py @@ -1,4 +1,5 @@ from .active_view import ActiveView from .inactive_view import InactiveView +from .log_view import LogView, LogLevel -__all__ = ['ActiveView', 'InactiveView'] \ No newline at end of file +__all__ = ['ActiveView', 'InactiveView', 'LogView', 'LogLevel'] \ No newline at end of file diff --git a/views/active_view.py b/views/active_view.py index 20f2778..b00f09d 100644 --- a/views/active_view.py +++ b/views/active_view.py @@ -1,62 +1,65 @@ +from widgets import ActiveCustomerCard +from gi.repository import Gtk import gi gi.require_version('Gtk', '3.0') -from gi.repository import Gtk -from widgets import ActiveCustomerCard class ActiveView: """View for displaying active customer locations.""" - + def __init__(self, callbacks): self.callbacks = callbacks self.widget = self._create_widget() - + def _create_widget(self): """Create the main container for active locations.""" # Main container vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) - + # Scrolled window for content scrolled = Gtk.ScrolledWindow() scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) scrolled.set_shadow_type(Gtk.ShadowType.NONE) vbox.pack_start(scrolled, True, True, 0) - + # Content box - self.content_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + self.content_box = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=12) scrolled.add(self.content_box) - + return vbox - + def update(self, customers): """Update the view with new customer data. - + Args: customers: List of Customer objects with active locations to display """ # Clear existing content for child in self.content_box.get_children(): child.destroy() - + if customers: # Add customer cards for customer in customers: customer_card = ActiveCustomerCard(customer, self.callbacks) - self.content_box.pack_start(customer_card.widget, False, False, 0) + self.content_box.pack_start( + customer_card.widget, False, False, 0) else: # Show empty state message no_active_label = Gtk.Label() - no_active_label.set_markup("No active locations") + no_active_label.set_markup( + "No active locations") no_active_label.set_margin_top(20) self.content_box.pack_start(no_active_label, False, False, 0) - + self.content_box.show_all() - + def set_visible(self, visible): """Set visibility of the entire view.""" self.widget.set_visible(visible) - + def clear(self): """Clear all content from the view.""" for child in self.content_box.get_children(): - child.destroy() \ No newline at end of file + child.destroy() diff --git a/views/inactive_view.py b/views/inactive_view.py index 34845b3..131e162 100644 --- a/views/inactive_view.py +++ b/views/inactive_view.py @@ -1,52 +1,54 @@ +from widgets import InactiveCustomerCard +from gi.repository import Gtk import gi gi.require_version('Gtk', '3.0') -from gi.repository import Gtk -from widgets import InactiveCustomerCard class InactiveView: """View for displaying inactive customer locations (search results).""" - + def __init__(self, callbacks): self.callbacks = callbacks self.widget = self._create_widget() self.current_search = "" - + def _create_widget(self): """Create the main container for inactive/search results.""" # Main container vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) - + # Scrolled window for content scrolled = Gtk.ScrolledWindow() scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) scrolled.set_shadow_type(Gtk.ShadowType.NONE) vbox.pack_start(scrolled, True, True, 0) - + # Content box - self.content_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + self.content_box = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=12) scrolled.add(self.content_box) - + return vbox - + def update(self, customers, search_term=""): """Update the view with search results. - + Args: customers: List of Customer objects with inactive locations to display search_term: The current search term """ self.current_search = search_term - + # Clear existing content for child in self.content_box.get_children(): child.destroy() - + if customers: # Add customer cards for customer in customers: customer_card = InactiveCustomerCard(customer, self.callbacks) - self.content_box.pack_start(customer_card.widget, False, False, 0) + self.content_box.pack_start( + customer_card.widget, False, False, 0) else: # Show no results message if search_term: @@ -56,14 +58,14 @@ class InactiveView: ) no_results_label.set_margin_top(20) self.content_box.pack_start(no_results_label, False, False, 0) - + self.content_box.show_all() - + def set_visible(self, visible): """Set visibility of the entire view.""" self.widget.set_visible(visible) - + def clear(self): """Clear all content from the view.""" for child in self.content_box.get_children(): - child.destroy() \ No newline at end of file + child.destroy() diff --git a/views/log_view.py b/views/log_view.py new file mode 100644 index 0000000..06ce0ea --- /dev/null +++ b/views/log_view.py @@ -0,0 +1,275 @@ +"""Log view for displaying command output and system logs.""" + +from enum import Enum +from typing import Optional +import time +from gi.repository import Gtk, GLib, Pango +import gi +gi.require_version('Gtk', '3.0') + + +class LogLevel(Enum): + """Log levels for different types of messages.""" + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + COMMAND = "COMMAND" + + +class LogView: + """View for displaying logs and command output.""" + + def __init__(self): + self.widget = self._create_widget() + self.max_lines = 1000 # Maximum number of log lines to keep + self.auto_scroll = True + + def _create_widget(self): + """Create the main log view widget.""" + # Main container + vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + + # Header with controls + header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + header_box.set_margin_start(12) + header_box.set_margin_end(12) + header_box.set_margin_top(8) + header_box.set_margin_bottom(8) + vbox.pack_start(header_box, False, False, 0) + + # Log title + log_label = Gtk.Label() + log_label.set_markup("📋 Command Log") + log_label.set_halign(Gtk.Align.START) + header_box.pack_start(log_label, False, False, 0) + + # Spacer + spacer = Gtk.Box() + header_box.pack_start(spacer, True, True, 0) + + # Auto-scroll toggle + self.autoscroll_switch = Gtk.Switch() + self.autoscroll_switch.set_active(True) + self.autoscroll_switch.connect( + "notify::active", self._on_autoscroll_toggle) + header_box.pack_start(self.autoscroll_switch, False, False, 0) + + autoscroll_label = Gtk.Label() + autoscroll_label.set_text("Auto-scroll") + autoscroll_label.set_margin_start(4) + header_box.pack_start(autoscroll_label, False, False, 0) + + # Clear button + clear_btn = Gtk.Button(label="Clear") + clear_btn.connect("clicked", self._on_clear_clicked) + header_box.pack_start(clear_btn, False, False, 0) + + # Separator + separator = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) + vbox.pack_start(separator, False, False, 0) + + # Scrolled window for log content + scrolled = Gtk.ScrolledWindow() + scrolled.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) + scrolled.set_min_content_height(150) + scrolled.set_max_content_height(400) + vbox.pack_start(scrolled, True, True, 0) + + # Text view for log content + self.text_view = Gtk.TextView() + self.text_view.set_editable(False) + self.text_view.set_cursor_visible(False) + self.text_view.set_wrap_mode(Gtk.WrapMode.WORD) + + # Set monospace font + font_desc = Pango.FontDescription("monospace 9") + self.text_view.modify_font(font_desc) + + scrolled.add(self.text_view) + + # Get text buffer and create tags for different log levels + self.text_buffer = self.text_view.get_buffer() + self._create_text_tags() + + # Store reference to scrolled window for auto-scrolling + self.scrolled_window = scrolled + + return vbox + + def _create_text_tags(self): + """Create text tags for different log levels.""" + # Command tag (bold, blue) + command_tag = self.text_buffer.create_tag("command") + command_tag.set_property("weight", Pango.Weight.BOLD) + command_tag.set_property("foreground", "#0066cc") + + # Info tag (default) + info_tag = self.text_buffer.create_tag("info") + + # Warning tag (orange) + warning_tag = self.text_buffer.create_tag("warning") + warning_tag.set_property("foreground", "#ff8800") + + # Error tag (red, bold) + error_tag = self.text_buffer.create_tag("error") + error_tag.set_property("foreground", "#cc0000") + error_tag.set_property("weight", Pango.Weight.BOLD) + + # Debug tag (gray) + debug_tag = self.text_buffer.create_tag("debug") + debug_tag.set_property("foreground", "#666666") + + # Timestamp tag (small, gray) + timestamp_tag = self.text_buffer.create_tag("timestamp") + timestamp_tag.set_property("foreground", "#888888") + timestamp_tag.set_property("size", 8 * Pango.SCALE) + + def _on_autoscroll_toggle(self, switch, gparam): + """Handle auto-scroll toggle.""" + self.auto_scroll = switch.get_active() + + def _on_clear_clicked(self, button): + """Clear the log content.""" + self.text_buffer.set_text("") + + def _auto_scroll_to_bottom(self): + """Scroll to bottom if auto-scroll is enabled.""" + if not self.auto_scroll: + return + + # Get the end iterator + end_iter = self.text_buffer.get_end_iter() + + # Create a mark at the end + mark = self.text_buffer.get_insert() + self.text_buffer.place_cursor(end_iter) + + # Scroll to the mark + self.text_view.scroll_mark_onscreen(mark) + + def _get_timestamp(self) -> str: + """Get current timestamp string.""" + return time.strftime("%H:%M:%S") + + def _trim_log_if_needed(self): + """Trim log to max_lines if exceeded.""" + line_count = self.text_buffer.get_line_count() + if line_count <= self.max_lines: + return + + # Calculate how many lines to remove (keep some buffer) + lines_to_remove = line_count - (self.max_lines - 100) + + # Get iterator at start + start_iter = self.text_buffer.get_start_iter() + + # Move to the line we want to keep + end_iter = self.text_buffer.get_iter_at_line(lines_to_remove) + + # Delete the old lines + self.text_buffer.delete(start_iter, end_iter) + + def log_message(self, message: str, level: LogLevel = LogLevel.INFO, + command: Optional[str] = None): + """Add a log message to the view. + + Args: + message: The message to log + level: The log level + command: Optional command that generated this message + """ + # Ensure we're on the main thread + GLib.idle_add(self._add_log_message, message, level, command) + + def _add_log_message(self, message: str, level: LogLevel, command: Optional[str]): + """Add log message to buffer (main thread only).""" + timestamp = self._get_timestamp() + + # Get end iterator + end_iter = self.text_buffer.get_end_iter() + + # Add timestamp + self.text_buffer.insert_with_tags_by_name( + end_iter, f"[{timestamp}] ", "timestamp" + ) + + # Add command if provided + if command: + end_iter = self.text_buffer.get_end_iter() + self.text_buffer.insert_with_tags_by_name( + end_iter, f"$ {command}\n", "command" + ) + + # Add the message with appropriate tag + end_iter = self.text_buffer.get_end_iter() + tag_name = level.value.lower() + self.text_buffer.insert_with_tags_by_name( + end_iter, f"{message}\n", tag_name + ) + + # Trim log if needed + self._trim_log_if_needed() + + # Auto-scroll to bottom + self._auto_scroll_to_bottom() + + return False # Remove from idle queue + + def log_command(self, command: str, output: str = "", error: str = "", + return_code: int = 0): + """Log a command execution with its output. + + Args: + command: The command that was executed + output: Standard output from the command + error: Standard error from the command + return_code: Command return code + """ + # Log the command + self.log_message("", LogLevel.COMMAND, command) + + # Log output if present + if output.strip(): + for line in output.strip().split('\n'): + self.log_message(line, LogLevel.INFO) + + # Log error if present + if error.strip(): + for line in error.strip().split('\n'): + self.log_message(f"ERROR: {line}", LogLevel.ERROR) + + # Log return code if non-zero + if return_code != 0: + self.log_message( + f"Command exited with code: {return_code}", LogLevel.ERROR) + elif return_code == 0 and (output.strip() or error.strip()): + self.log_message("Command completed successfully", LogLevel.INFO) + + def log_info(self, message: str): + """Log an info message.""" + self.log_message(message, LogLevel.INFO) + + def log_warning(self, message: str): + """Log a warning message.""" + self.log_message(message, LogLevel.WARNING) + + def log_error(self, message: str): + """Log an error message.""" + self.log_message(message, LogLevel.ERROR) + + def log_debug(self, message: str): + """Log a debug message.""" + self.log_message(message, LogLevel.DEBUG) + + def log_success(self, message: str): + """Log a success message.""" + self.log_message(f"✓ {message}", LogLevel.INFO) + + def set_visible(self, visible: bool): + """Set visibility of the entire view.""" + self.widget.set_visible(visible) + + def clear(self): + """Clear all log content.""" + self._on_clear_clicked(None) diff --git a/widgets/__init__.py b/widgets/__init__.py index 501cb30..0bb0533 100644 --- a/widgets/__init__.py +++ b/widgets/__init__.py @@ -1,11 +1,12 @@ from .host_item import HostItem from .location_card import ActiveLocationCard, InactiveLocationCard -from .customer_card import ActiveCustomerCard, InactiveCustomerCard +from .active_customer_card import ActiveCustomerCard +from .inactive_customer_card import InactiveCustomerCard __all__ = [ 'HostItem', - 'ActiveLocationCard', + 'ActiveLocationCard', 'InactiveLocationCard', 'ActiveCustomerCard', 'InactiveCustomerCard' -] \ No newline at end of file +] diff --git a/widgets/active_customer_card.py b/widgets/active_customer_card.py new file mode 100644 index 0000000..b068c58 --- /dev/null +++ b/widgets/active_customer_card.py @@ -0,0 +1,399 @@ +from utils import IconLoader +from gi.repository import Gtk +import gi +gi.require_version('Gtk', '3.0') + + +def escape_markup(text: str) -> str: + """Escape special characters for Pango markup.""" + return text.replace('&', '&').replace('<', '<').replace('>', '>') + + +class ActiveCustomerCard: + def __init__(self, customer, callbacks): + self.customer = customer + self.callbacks = callbacks + self.expanded = True # Start expanded by default + self.location_expanded = {} # Track expansion state of each location + # Initialize all locations as expanded + for location in self.customer.locations: + self.location_expanded[location.name] = True + self.widget = self._create_widget() + + def _create_widget(self): + # Customer card container + customer_frame = Gtk.Frame() + customer_frame.get_style_context().add_class("card") + customer_frame.set_shadow_type(Gtk.ShadowType.NONE) + + customer_vbox = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=4) + customer_frame.add(customer_vbox) + + # Customer header row + customer_row = self._create_customer_header() + customer_vbox.pack_start(customer_row, False, False, 0) + + # Content container (locations) + self.content_box = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=6) + self.content_box.set_margin_start(8) + self.content_box.set_margin_end(8) + self.content_box.set_margin_bottom(8) + customer_vbox.pack_start(self.content_box, False, False, 0) + + # Add location cards + for location in self.customer.locations: + location_card = self._create_location_card(location) + self.content_box.pack_start(location_card, False, False, 0) + + return customer_frame + + def _create_customer_header(self): + """Create the customer header row.""" + row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + row.set_margin_start(4) + row.set_margin_end(4) + row.set_margin_top(4) + row.set_margin_bottom(2) + + # Expand/collapse arrow + self.expand_button = Gtk.Button() + self.expand_button.set_relief(Gtk.ReliefStyle.NONE) + self.expand_button.set_can_focus(False) + self.expand_button.set_size_request(20, 20) + self._update_expand_button() + self.expand_button.connect("clicked", self._on_expand_toggle) + row.pack_start(self.expand_button, False, False, 0) + + # Customer name + customer_label = Gtk.Label() + escaped_name = escape_markup(self.customer.name) + customer_label.set_markup(f"{escaped_name}") + customer_label.set_halign(Gtk.Align.START) + row.pack_start(customer_label, True, True, 0) + + # Customer service icons (right side) + self._add_customer_service_icons(row) + + return row + + def _create_location_card(self, location): + """Create a location card (subcard within customer card).""" + # Location subcard + location_frame = Gtk.Frame() + location_frame.get_style_context().add_class("card") + location_frame.set_shadow_type(Gtk.ShadowType.NONE) + + location_vbox = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=2) + location_frame.add(location_vbox) + + # Location header row + location_row = self._create_location_header(location) + location_vbox.pack_start(location_row, False, False, 0) + + # Hosts container (collapsible) + location_key = location.name + hosts_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) + hosts_box.set_margin_start(8) + hosts_box.set_margin_end(4) + hosts_box.set_margin_bottom(4) + + # Add hosts for this location + if location.hosts: + for host in location.hosts: + host_row = self._create_host_row(host, location) + hosts_box.pack_start(host_row, False, False, 0) + + # Add sub-hosts (VMs) + if host.sub_hosts: + for vm in host.sub_hosts: + vm_row = self._create_host_row( + vm, location, is_vm=True) + hosts_box.pack_start(vm_row, False, False, 0) + + location_vbox.pack_start(hosts_box, False, False, 0) + + # Store reference to hosts_box for expand/collapse + setattr(location_frame, 'hosts_box', hosts_box) + setattr(location_frame, 'location', location) + + # Set initial visibility + expanded = self.location_expanded.get(location_key, True) + hosts_box.set_visible(expanded) + + return location_frame + + def _create_location_header(self, location): + """Create the location header row.""" + row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + row.set_margin_start(4) + row.set_margin_end(4) + row.set_margin_top(4) + row.set_margin_bottom(2) + + # Location expand/collapse arrow + expand_btn = Gtk.Button() + expand_btn.set_relief(Gtk.ReliefStyle.NONE) + expand_btn.set_can_focus(False) + expand_btn.set_size_request(20, 20) + + # Set initial arrow direction + expanded = self.location_expanded.get(location.name, True) + expand_btn.set_label("▼" if expanded else "▶") + + # Connect to toggle function + expand_btn.connect( + "clicked", lambda btn: self._toggle_location_expansion(location, btn)) + row.pack_start(expand_btn, False, False, 0) + + # Location info + location_info = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=0) + + # Location name + name_label = Gtk.Label() + escaped_location_name = escape_markup(location.name) + name_label.set_markup(f"{escaped_location_name}") + name_label.set_halign(Gtk.Align.START) + location_info.pack_start(name_label, False, False, 0) + + # External addresses (small text) + if location.external_addresses: + addr_text = ", ".join( + location.external_addresses[:2]) # Show first 2 + if len(location.external_addresses) > 2: + addr_text += f" (+{len(location.external_addresses) - 2} more)" + addr_label = Gtk.Label() + addr_label.set_markup(f"{addr_text}") + addr_label.set_halign(Gtk.Align.START) + location_info.pack_start(addr_label, False, False, 0) + + row.pack_start(location_info, True, True, 0) + + # VPN Status + status_label = Gtk.Label() + if location.connected: + status_label.set_markup( + "Connected") + else: + status_label.set_markup( + "Disconnected") + row.pack_start(status_label, False, False, 0) + + # Action icons + self._add_location_action_icons(row, location) + + return row + + def _create_host_row(self, host, location, is_vm=False): + """Create a host row with aligned IP addresses.""" + row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + margin = 16 if not is_vm else 32 # VMs are more indented + row.set_margin_start(margin) + row.set_margin_end(4) + row.set_margin_top(1) + row.set_margin_bottom(1) + + # Host icon - custom or fallback to Material Icons + icon_widget = IconLoader.get_host_icon_widget(host, size=20) + icon_container = Gtk.Box() + icon_container.set_size_request(24, 24) # Fixed size for alignment + icon_container.set_center_widget(icon_widget) + row.pack_start(icon_container, False, False, 0) + + # Host name (fixed width for alignment) + escaped_host_name = escape_markup(host.name) + name_markup = f"{escaped_host_name}" if is_vm else f"{escaped_host_name}" + name_label = Gtk.Label() + name_label.set_markup(name_markup) + name_label.set_halign(Gtk.Align.START) + name_label.set_size_request(150, -1) # Fixed width to align IPs + row.pack_start(name_label, False, False, 0) + + # IP address (aligned in middle) + ip_label = Gtk.Label() + ip_label.set_markup(f"{host.get_ip_display()}") + ip_label.set_size_request(120, -1) # Fixed width for alignment + ip_label.set_halign(Gtk.Align.CENTER) + ip_label.set_tooltip_text(", ".join(host.get_all_ips()) if len( + host.ip_addresses) > 1 else host.get_primary_ip()) + row.pack_start(ip_label, False, False, 0) + + # Spacer to push service icons to the right + spacer = Gtk.Box() + row.pack_start(spacer, True, True, 0) + + # Service action icons + self._add_host_service_icons(row, host, location) + + return row + + def _add_customer_service_icons(self, row): + """Add customer service icons to the right side.""" + # Service type to icon mapping + service_icons = { + 'Email & Office': '📧', # O365 + 'Phone System': '📞', # PBX + 'CRM': '👥', # Salesforce + 'Email': '📧', + 'Office': '📄', + } + + # Add icons for each service + for service in self.customer.services[:4]: # Limit to 4 icons + icon = service_icons.get(service.service_type, '🌐') + + btn = Gtk.Button() + btn.set_label(icon) + btn.set_relief(Gtk.ReliefStyle.NONE) + btn.set_can_focus(False) + btn.set_size_request(24, 24) + btn.set_tooltip_text(f"Open {service.name}") + btn.connect("clicked", lambda b, + s=service: self.callbacks['open_customer_service'](s)) + row.pack_start(btn, False, False, 0) + + # Menu button (always last) + menu_btn = self._create_menu_button() + row.pack_start(menu_btn, False, False, 0) + + def _add_location_action_icons(self, row, location): + """Add location action icons.""" + # Connection toggle + connect_icon = "🔌" if not location.connected else "🔓" + connect_btn = Gtk.Button() + connect_btn.set_label(connect_icon) + connect_btn.set_relief(Gtk.ReliefStyle.NONE) + connect_btn.set_can_focus(False) + connect_btn.set_size_request(24, 24) + tooltip = "Connect to VPN" if not location.connected else "Disconnect VPN" + connect_btn.set_tooltip_text(tooltip) + connect_btn.connect( + "clicked", lambda b: self.callbacks['toggle_connection'](location)) + row.pack_start(connect_btn, False, False, 0) + + # Refresh/reload + refresh_btn = Gtk.Button() + refresh_btn.set_label("🔄") + refresh_btn.set_relief(Gtk.ReliefStyle.NONE) + refresh_btn.set_can_focus(False) + refresh_btn.set_size_request(24, 24) + refresh_btn.set_tooltip_text("Refresh connection") + row.pack_start(refresh_btn, False, False, 0) + + # Menu + menu_btn = self._create_menu_button() + row.pack_start(menu_btn, False, False, 0) + + def _add_host_service_icons(self, row, host, location): + """Add host service icons with reachability check.""" + # Service type to Material Icons mapping + # Icon names from: https://fonts.google.com/icons + service_icons = { + 'SSH': 'terminal', # Terminal icon for SSH + 'Web GUI': 'language', # Globe icon for web + 'RDP': 'desktop_windows', # Desktop icon for RDP + 'VNC': 'monitor', # Monitor icon for VNC + 'SMB': 'folder_shared', # Shared folder for SMB + 'Database': 'storage', # Database/storage icon + 'FTP': 'cloud_upload' # Upload icon for FTP + } + + # Add icons for services + for service in host.services[:3]: # Limit to 3 service icons + # Default to settings icon + icon = service_icons.get(service.service_type.value, 'settings') + + # Check if service is reachable + is_reachable = location.is_service_reachable(host, service) + is_external = location.get_external_url_for_service( + host, service) is not None + + btn = Gtk.Button() + btn.set_label(icon) # Material Icons uses ligatures + btn.set_relief(Gtk.ReliefStyle.NONE) + btn.set_can_focus(False) + btn.set_size_request(24, 24) + + # Apply color styling based on reachability + if is_reachable: + # Green for accessible + btn.get_style_context().add_class("service-icon-accessible") + if is_external and not location.connected: + external_url = location.get_external_url_for_service( + host, service) + btn.set_tooltip_text( + f"{service.service_type.value}: {service.name}\nExternal: {external_url}") + else: + btn.set_tooltip_text( + f"{service.service_type.value}: {service.name}") + else: + # Red for not accessible + btn.get_style_context().add_class("service-icon-inaccessible") + btn.set_tooltip_text( + f"{service.service_type.value}: {service.name}\nNot reachable (VPN disconnected)") + + # Enable/disable based on reachability + btn.set_sensitive(is_reachable) + + # Connect click handler only if reachable + if is_reachable: + btn.connect("clicked", lambda b, + s=service: self.callbacks['open_service'](s)) + + row.pack_start(btn, False, False, 0) + + # Menu button + menu_btn = self._create_menu_button() + row.pack_start(menu_btn, False, False, 0) + + def _create_menu_button(self): + """Create a menu button with empty popup.""" + menu_btn = Gtk.MenuButton() + menu_btn.set_label("⋯") # Three dots menu + menu_btn.set_relief(Gtk.ReliefStyle.NONE) + menu_btn.set_can_focus(False) + menu_btn.set_size_request(24, 24) + + # Create empty menu for now + menu = Gtk.Menu() + placeholder_item = Gtk.MenuItem(label="(Empty menu)") + placeholder_item.set_sensitive(False) + menu.append(placeholder_item) + menu.show_all() + + menu_btn.set_popup(menu) + return menu_btn + + def _update_expand_button(self): + """Update the expand button arrow direction.""" + if self.expanded: + self.expand_button.set_label("▼") + else: + self.expand_button.set_label("▶") + + def _toggle_location_expansion(self, location, button): + """Toggle the expansion state of a specific location.""" + location_key = location.name + current_state = self.location_expanded.get(location_key, True) + new_state = not current_state + self.location_expanded[location_key] = new_state + + # Update button arrow + button.set_label("▼" if new_state else "▶") + + # Find the location card and toggle its hosts box visibility + for widget in self.content_box.get_children(): + if hasattr(widget, 'location') and widget.location.name == location_key: + hosts_box = getattr(widget, 'hosts_box', None) + if hosts_box: + hosts_box.set_visible(new_state) + break + + def _on_expand_toggle(self, button): + """Toggle the expanded state.""" + self.expanded = not self.expanded + self._update_expand_button() + self.content_box.set_visible(self.expanded) diff --git a/widgets/customer_card.py b/widgets/customer_card.py deleted file mode 100644 index 45c746b..0000000 --- a/widgets/customer_card.py +++ /dev/null @@ -1,114 +0,0 @@ -import gi -gi.require_version('Gtk', '3.0') -from gi.repository import Gtk -from .location_card import ActiveLocationCard, InactiveLocationCard - - -class ActiveCustomerCard: - def __init__(self, customer, callbacks): - self.customer = customer - self.callbacks = callbacks - self.widget = self._create_widget() - - def _create_widget(self): - # GNOME-style card container - card_frame = Gtk.Frame() - card_frame.get_style_context().add_class("card") - card_frame.set_shadow_type(Gtk.ShadowType.NONE) # Shadow handled by CSS - - card_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) - card_frame.add(card_vbox) - - # Customer header - customer_label = Gtk.Label() - customer_label.set_markup(f"🏢 {self.customer.name}") - customer_label.set_halign(Gtk.Align.START) - card_vbox.pack_start(customer_label, False, False, 0) - - # Customer services section - if self.customer.services: - services_label = Gtk.Label() - services_label.set_markup("Cloud Services") - services_label.set_halign(Gtk.Align.START) - services_label.set_margin_top(8) - card_vbox.pack_start(services_label, False, False, 0) - - # Services box with indent - services_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) - services_box.set_margin_start(16) - services_box.set_margin_bottom(8) - card_vbox.pack_start(services_box, False, False, 0) - - for service in self.customer.services: - service_btn = Gtk.Button(label=service.name) - service_btn.get_style_context().add_class("suggested-action") - service_btn.connect("clicked", lambda btn, s=service: self.callbacks['open_customer_service'](s)) - services_box.pack_start(service_btn, False, False, 0) - - # Locations section - for i, location in enumerate(self.customer.locations): - if i > 0: # Add separator between locations - separator = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) - separator.set_margin_top(8) - separator.set_margin_bottom(8) - card_vbox.pack_start(separator, False, False, 0) - - location_card = ActiveLocationCard(location, self.customer.name, self.callbacks) - card_vbox.pack_start(location_card.widget, False, False, 0) - - return card_frame - - -class InactiveCustomerCard: - def __init__(self, customer, callbacks): - self.customer = customer - self.callbacks = callbacks - self.widget = self._create_widget() - - def _create_widget(self): - # GNOME-style card container - card_frame = Gtk.Frame() - card_frame.get_style_context().add_class("card") - card_frame.set_shadow_type(Gtk.ShadowType.NONE) # Shadow handled by CSS - - card_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) - card_frame.add(card_vbox) - - # Customer header - muted - customer_label = Gtk.Label() - customer_label.set_markup(f"🏢 {self.customer.name}") - customer_label.set_halign(Gtk.Align.START) - card_vbox.pack_start(customer_label, False, False, 0) - - # Customer services section - list format for inactive - if self.customer.services: - services_label = Gtk.Label() - services_label.set_markup("Cloud Services") - services_label.set_halign(Gtk.Align.START) - services_label.set_margin_top(8) - card_vbox.pack_start(services_label, False, False, 0) - - # Services list with indent - services_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) - services_vbox.set_margin_start(16) - services_vbox.set_margin_bottom(8) - card_vbox.pack_start(services_vbox, False, False, 0) - - for service in self.customer.services: - service_label = Gtk.Label() - service_label.set_markup(f"• {service.name} ({service.service_type})") - service_label.set_halign(Gtk.Align.START) - services_vbox.pack_start(service_label, False, False, 0) - - # Locations section - for i, location in enumerate(self.customer.locations): - if i > 0: # Add separator between locations - separator = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) - separator.set_margin_top(8) - separator.set_margin_bottom(8) - card_vbox.pack_start(separator, False, False, 0) - - location_card = InactiveLocationCard(location, self.customer.name, self.callbacks) - card_vbox.pack_start(location_card.widget, False, False, 0) - - return card_frame \ No newline at end of file diff --git a/widgets/host_item.py b/widgets/host_item.py index 980c7a9..8390420 100644 --- a/widgets/host_item.py +++ b/widgets/host_item.py @@ -2,11 +2,18 @@ import gi gi.require_version('Gtk', '3.0') from gi.repository import Gtk from models import ServiceType, HostType +from utils import IconLoader + + +def escape_markup(text: str) -> str: + """Escape special characters for Pango markup.""" + return text.replace('&', '&').replace('<', '<').replace('>', '>') class HostItem: - def __init__(self, host, open_service_callback): + def __init__(self, host, location, open_service_callback): self.host = host + self.location = location self.open_service_callback = open_service_callback self.widget = self._create_widget() @@ -18,20 +25,12 @@ class HostItem: host_header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) host_box.pack_start(host_header, False, False, 0) - # Host type icon - type_icons = { - HostType.LINUX: "🐧", - HostType.WINDOWS: "🪟", - HostType.WINDOWS_SERVER: "🖥️", - HostType.PROXMOX: "📦", - HostType.ESXI: "📦", - HostType.ROUTER: "🌐", - HostType.SWITCH: "🔗" - } - icon = type_icons.get(self.host.host_type, "💻") - - icon_label = Gtk.Label(label=icon) - host_header.pack_start(icon_label, False, False, 0) + # Host icon - custom or fallback to Material Icons + icon_widget = IconLoader.get_host_icon_widget(self.host, size=24) + icon_container = Gtk.Box() + icon_container.set_size_request(32, 24) # Fixed size + icon_container.set_center_widget(icon_widget) + host_header.pack_start(icon_container, False, False, 0) # Host details - compact single line details_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1) @@ -39,8 +38,14 @@ class HostItem: # Host name with IP inline name_label = Gtk.Label() - name_label.set_markup(f"{self.host.name} ({self.host.host_type.value}) - {self.host.ip_address}") + ip_display = self.host.get_ip_display() + escaped_host_name = escape_markup(self.host.name) + escaped_host_type = escape_markup(self.host.host_type.value) + escaped_ip_display = escape_markup(ip_display) + name_label.set_markup(f"{escaped_host_name} ({escaped_host_type}) - {escaped_ip_display}") name_label.set_halign(Gtk.Align.START) + if len(self.host.ip_addresses) > 1: + name_label.set_tooltip_text(f"All IPs: {', '.join(self.host.get_all_ips())}") details_vbox.pack_start(name_label, False, False, 0) # Services section - compact button row @@ -52,9 +57,35 @@ class HostItem: for service in self.host.services: if service.service_type in [ServiceType.WEB_GUI, ServiceType.SSH, ServiceType.RDP]: # Only show launchable services + # Check if service is reachable + is_reachable = self.location.is_service_reachable(self.host, service) + is_external = self.location.get_external_url_for_service(self.host, service) is not None + service_btn = Gtk.Button(label=service.service_type.value) - service_btn.get_style_context().add_class("suggested-action") - service_btn.connect("clicked", lambda btn, s=service: self._on_service_clicked(s)) + + # Apply color-based styling + if is_reachable: + # Green styling for accessible services + service_btn.get_style_context().add_class("suggested-action") + service_btn.set_name("service-btn-accessible") + if is_external and not self.location.connected: + external_url = self.location.get_external_url_for_service(self.host, service) + service_btn.set_tooltip_text(f"Open {service.name}\nExternal: {external_url}") + else: + service_btn.set_tooltip_text(f"Open {service.name}") + else: + # Red styling for inaccessible services + service_btn.get_style_context().add_class("destructive-action") + service_btn.set_name("service-btn-inaccessible") + service_btn.set_tooltip_text(f"{service.name} - Not reachable (VPN disconnected)") + + # Enable/disable based on reachability + service_btn.set_sensitive(is_reachable) + + # Connect handler only if reachable + if is_reachable: + service_btn.connect("clicked", lambda btn, s=service: self._on_service_clicked(s)) + services_box.pack_start(service_btn, False, False, 0) # Sub-hosts (VMs) section @@ -71,7 +102,7 @@ class HostItem: host_box.pack_start(subhosts_box, False, False, 0) for subhost in self.host.sub_hosts: - subhost_item = HostItem(subhost, self.open_service_callback) + subhost_item = HostItem(subhost, self.location, self.open_service_callback) subhosts_box.pack_start(subhost_item.widget, False, False, 0) return host_box diff --git a/widgets/inactive_customer_card.py b/widgets/inactive_customer_card.py new file mode 100644 index 0000000..f452b11 --- /dev/null +++ b/widgets/inactive_customer_card.py @@ -0,0 +1,118 @@ +from .location_card import InactiveLocationCard +from gi.repository import Gtk +import gi +gi.require_version('Gtk', '3.0') + + +def escape_markup(text: str) -> str: + """Escape special characters for Pango markup.""" + return text.replace('&', '&').replace('<', '<').replace('>', '>') + + +class InactiveCustomerCard: + def __init__(self, customer, callbacks): + self.customer = customer + self.callbacks = callbacks + self.expanded = False # Start collapsed by default for inactive + self.widget = self._create_widget() + + def _create_widget(self): + # GNOME-style card container + card_frame = Gtk.Frame() + card_frame.get_style_context().add_class("card") + card_frame.set_shadow_type( + Gtk.ShadowType.NONE) # Shadow handled by CSS + + card_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + card_frame.add(card_vbox) + + # Customer header with expand/collapse button - muted + header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + card_vbox.pack_start(header_box, False, False, 0) + + # Expand/collapse arrow button + self.expand_button = Gtk.Button() + self.expand_button.set_relief(Gtk.ReliefStyle.NONE) + self.expand_button.set_can_focus(False) + self._update_expand_button() + self.expand_button.connect("clicked", self._on_expand_toggle) + header_box.pack_start(self.expand_button, False, False, 0) + + # Customer name - muted + customer_label = Gtk.Label() + escaped_name = escape_markup(self.customer.name) + customer_label.set_markup( + f"🏢 {escaped_name}") + customer_label.set_halign(Gtk.Align.START) + header_box.pack_start(customer_label, False, False, 0) + + # Location count badge + inactive_count = len(self.customer.locations) + if inactive_count > 0: + count_label = Gtk.Label() + count_label.set_markup( + f"({inactive_count})") + header_box.pack_start(count_label, False, False, 0) + + # Content container (collapsible) + self.content_box = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=12) + self.content_box.set_visible(self.expanded) # Start hidden + card_vbox.pack_start(self.content_box, False, False, 0) + + # Customer services section - list format for inactive + if self.customer.services: + services_label = Gtk.Label() + services_label.set_markup( + "Cloud Services") + services_label.set_halign(Gtk.Align.START) + services_label.set_margin_top(8) + self.content_box.pack_start(services_label, False, False, 0) + + # Services list with indent + services_vbox = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=2) + services_vbox.set_margin_start(16) + services_vbox.set_margin_bottom(8) + self.content_box.pack_start(services_vbox, False, False, 0) + + for service in self.customer.services: + service_label = Gtk.Label() + # Escape special characters in markup text + escaped_name = escape_markup(service.name) + escaped_type = escape_markup(service.service_type) + service_label.set_markup( + f"• {escaped_name} ({escaped_type})") + service_label.set_halign(Gtk.Align.START) + services_vbox.pack_start(service_label, False, False, 0) + + # Locations section + for i, location in enumerate(self.customer.locations): + if i > 0: # Add separator between locations + separator = Gtk.Separator( + orientation=Gtk.Orientation.HORIZONTAL) + separator.set_margin_top(8) + separator.set_margin_bottom(8) + self.content_box.pack_start(separator, False, False, 0) + + location_card = InactiveLocationCard( + location, self.customer.name, self.callbacks) + self.content_box.pack_start(location_card.widget, False, False, 0) + + # Show all content in the box (but box itself may be hidden) + self.content_box.show_all() + + return card_frame + + def _update_expand_button(self): + """Update the expand button arrow direction.""" + if self.expanded: + self.expand_button.set_label("▼") + else: + self.expand_button.set_label("▶") + + def _on_expand_toggle(self, button): + """Toggle the expanded state.""" + self.expanded = not self.expanded + self._update_expand_button() + self.content_box.set_visible(self.expanded) diff --git a/widgets/location_card.py b/widgets/location_card.py index 5f9167a..4b90389 100644 --- a/widgets/location_card.py +++ b/widgets/location_card.py @@ -1,8 +1,13 @@ +from models import VPNType +from .host_item import HostItem +from gi.repository import Gtk import gi gi.require_version('Gtk', '3.0') -from gi.repository import Gtk -from .host_item import HostItem -from models import VPNType + + +def escape_markup(text: str) -> str: + """Escape special characters for Pango markup.""" + return text.replace('&', '&').replace('<', '<').replace('>', '>') class ActiveLocationCard: @@ -11,49 +16,78 @@ class ActiveLocationCard: self.customer_name = customer_name self.callbacks = callbacks self.widget = self._create_widget() - + def _create_widget(self): # Clean card layout - just a box with proper spacing - location_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) - + location_vbox = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=8) + # Location header with controls - header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12) + header_box = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=12) location_vbox.pack_start(header_box, False, False, 0) - + # Location info info_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) header_box.pack_start(info_vbox, True, True, 0) - + # Location name with VPN type location_label = Gtk.Label() - location_label.set_markup(f"📍 {self.location.name}") + escaped_location_name = escape_markup(self.location.name) + location_label.set_markup(f"📍 {escaped_location_name}") location_label.set_halign(Gtk.Align.START) info_vbox.pack_start(location_label, False, False, 0) - - # VPN type + + # VPN type and external address vpn_icons = { VPNType.OPENVPN: "🔒", VPNType.WIREGUARD: "⚡", VPNType.IPSEC: "🛡️" } vpn_icon = vpn_icons.get(self.location.vpn_type, "🔑") - + + type_text = f"{vpn_icon} {self.location.vpn_type.value} VPN" + if self.location.external_addresses: + if len(self.location.external_addresses) == 1: + type_text += f" • 🌐 {self.location.external_addresses[0]}" + else: + type_text += f" • 🌐 {len(self.location.external_addresses)} endpoints" + type_label = Gtk.Label() - type_label.set_markup(f"{vpn_icon} {self.location.vpn_type.value} VPN") + type_label.set_markup(f"{type_text}") type_label.set_halign(Gtk.Align.START) info_vbox.pack_start(type_label, False, False, 0) - + + # External addresses and networks if available + if self.location.external_addresses and len(self.location.external_addresses) > 1: + # Show full list if more than one + addresses_text = "🌐 External: " + \ + ", ".join(self.location.external_addresses) + addresses_label = Gtk.Label() + addresses_label.set_markup(f"{addresses_text}") + addresses_label.set_halign(Gtk.Align.START) + info_vbox.pack_start(addresses_label, False, False, 0) + + if self.location.networks: + networks_text = "📡 Networks: " + ", ".join(self.location.networks) + networks_label = Gtk.Label() + networks_label.set_markup(f"{networks_text}") + networks_label.set_halign(Gtk.Align.START) + info_vbox.pack_start(networks_label, False, False, 0) + # Status and controls - controls_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + controls_box = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=8) header_box.pack_end(controls_box, False, False, 0) - + # Status status_text = "● Connected" if self.location.connected else "○ Disconnected" status_color = "#4caf50" if self.location.connected else "#999" status_label = Gtk.Label() - status_label.set_markup(f"{status_text}") + status_label.set_markup( + f"{status_text}") controls_box.pack_start(status_label, False, False, 0) - + # Connect/Disconnect button btn_text = "Disconnect" if self.location.connected else "Connect" connect_btn = Gtk.Button(label=btn_text) @@ -63,14 +97,14 @@ class ActiveLocationCard: connect_btn.get_style_context().add_class("suggested-action") connect_btn.connect("clicked", self._on_connect_clicked) controls_box.pack_start(connect_btn, False, False, 0) - + # X button to deactivate (close button style) close_btn = Gtk.Button(label="✕") close_btn.set_tooltip_text("Deactivate location") close_btn.get_style_context().add_class("circular") close_btn.connect("clicked", self._on_deactivate_clicked) controls_box.pack_start(close_btn, False, False, 0) - + # Hosts section if available if self.location.hosts: hosts_label = Gtk.Label() @@ -78,23 +112,26 @@ class ActiveLocationCard: hosts_label.set_halign(Gtk.Align.START) hosts_label.set_margin_top(8) location_vbox.pack_start(hosts_label, False, False, 0) - + # Hosts box with indent - hosts_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + hosts_box = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=8) hosts_box.set_margin_start(16) location_vbox.pack_start(hosts_box, False, False, 0) - + for host in self.location.hosts: - host_item = HostItem(host, self.callbacks['open_service']) + host_item = HostItem(host, self.location, + self.callbacks['open_service']) hosts_box.pack_start(host_item.widget, False, False, 0) - + return location_vbox - + def _on_connect_clicked(self, button): self.callbacks['toggle_connection'](self.location) - + def _on_deactivate_clicked(self, button): - self.callbacks['deactivate_location'](self.location, self.customer_name) + self.callbacks['deactivate_location']( + self.location, self.customer_name) class InactiveLocationCard: @@ -103,54 +140,68 @@ class InactiveLocationCard: self.customer_name = customer_name self.callbacks = callbacks self.widget = self._create_widget() - + def _create_widget(self): # Clean horizontal layout - location_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12) - + location_hbox = Gtk.Box( + orientation=Gtk.Orientation.HORIZONTAL, spacing=12) + # Location info info_vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) location_hbox.pack_start(info_vbox, True, True, 0) - + # Location name location_label = Gtk.Label() - location_label.set_markup(f"📍 {self.location.name}") + escaped_location_name = escape_markup(self.location.name) + location_label.set_markup(f"📍 {escaped_location_name}") location_label.set_halign(Gtk.Align.START) info_vbox.pack_start(location_label, False, False, 0) - - # VPN type and host count + + # VPN type, external address and host count vpn_icons = { VPNType.OPENVPN: "🔒", - VPNType.WIREGUARD: "⚡", + VPNType.WIREGUARD: "⚡", VPNType.IPSEC: "🛡️" } vpn_icon = vpn_icons.get(self.location.vpn_type, "🔑") host_count = len(self.location.hosts) - + + details_text = f"{vpn_icon} {self.location.vpn_type.value} VPN • {host_count} hosts" + if self.location.external_addresses: + if len(self.location.external_addresses) == 1: + details_text += f" • 🌐 {self.location.external_addresses[0]}" + else: + details_text += f" • 🌐 {len(self.location.external_addresses)} endpoints" + if self.location.networks: + network_count = len(self.location.networks) + details_text += f" • {network_count} network{'s' if network_count > 1 else ''}" + details_label = Gtk.Label() - details_label.set_markup(f"{vpn_icon} {self.location.vpn_type.value} VPN • {host_count} hosts") + details_label.set_markup(f"{details_text}") details_label.set_halign(Gtk.Align.START) info_vbox.pack_start(details_label, False, False, 0) - + # Button box for multiple buttons button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) location_hbox.pack_end(button_box, False, False, 0) - + # Set as Current button current_btn = Gtk.Button(label="Set as Current") current_btn.connect("clicked", self._on_set_current_clicked) button_box.pack_start(current_btn, False, False, 0) - + # Activate button activate_btn = Gtk.Button(label="Set Active") activate_btn.get_style_context().add_class("suggested-action") activate_btn.connect("clicked", self._on_activate_clicked) button_box.pack_start(activate_btn, False, False, 0) - + return location_hbox - + def _on_activate_clicked(self, button): - self.callbacks['set_location_active'](self.location, self.customer_name) - + self.callbacks['set_location_active']( + self.location, self.customer_name) + def _on_set_current_clicked(self, button): - self.callbacks['set_current_location'](self.location, self.customer_name) \ No newline at end of file + self.callbacks['set_current_location']( + self.location, self.customer_name)