This commit is contained in:
2025-09-07 23:33:55 +02:00
parent d918f1e497
commit fbacfde9f2
33 changed files with 2626 additions and 1236 deletions

View File

@@ -1,17 +1,8 @@
"""Services package for VPN and password management."""
from .vpn_manager import VPNManager, VPNConnectionError, VPNStatus, VPNConnection
from .passbolt_client import PassboltClient, PassboltError, PassboltCredential
from .connection_manager import ConnectionManager, ConnectionConfig
from .vpn_manager import VPNManager, VPNConnectionError, VPNStatus
__all__ = [
'VPNManager',
'VPNConnection',
'VPNConnectionError',
'VPNStatus',
'PassboltClient',
'PassboltCredential',
'PassboltError',
'ConnectionManager',
'ConnectionConfig',
]
'VPNConnectionError',
'VPNStatus'
]

View File

@@ -1,266 +0,0 @@
"""High-level connection manager that integrates VPN and Passbolt."""
import logging
from typing import Optional, Dict, Any
from pathlib import Path
from dataclasses import dataclass
from .vpn_manager import VPNManager, VPNStatus, VPNConnectionError
from .passbolt_client import PassboltClient, PassboltError
logger = logging.getLogger(__name__)
@dataclass
class ConnectionConfig:
"""Configuration for a VPN connection."""
name: str
vpn_config_path: str
nmcli_connection_name: Optional[str] = None
auto_import: bool = True # Auto-import .ovpn if not in NetworkManager
# Credentials can be:
# - Passbolt UUID string (for future implementation)
# - Dict with 'username' and 'password' keys
# - None if no credentials needed
vpn_credentials: Optional[dict | str] = None
class ConnectionManager:
"""Manages VPN connections with Passbolt credential integration."""
def __init__(self, use_passbolt: bool = True):
"""Initialize the connection manager.
Args:
use_passbolt: Whether to use Passbolt for credentials
"""
self.vpn_manager = VPNManager()
self.passbolt_client = None
if use_passbolt:
try:
self.passbolt_client = PassboltClient()
logger.info("Passbolt client initialized successfully")
except PassboltError as e:
logger.warning(f"Passbolt not available: {e}")
logger.info("Falling back to manual credential entry")
def connect_location(self, config: ConnectionConfig,
username: Optional[str] = None,
password: Optional[str] = None) -> None:
"""Connect to a VPN location.
Args:
config: Connection configuration
username: Override username (if not using Passbolt)
password: Override password (if not using Passbolt)
"""
# Ensure connection exists in NetworkManager
connection_name = self._ensure_connection(config)
# Get credentials - check overrides first, then config
if not username or not password:
creds_username, creds_password = self._get_credentials_from_config(config)
username = username or creds_username
password = password or creds_password
if not username or not password:
logger.info(f"No credentials provided for {connection_name}")
# nmcli will prompt for credentials
# Connect
try:
logger.info(f"Connecting to {connection_name}")
self.vpn_manager.connect(connection_name, username, password)
logger.info(f"Successfully connected to {connection_name}")
except VPNConnectionError as e:
logger.error(f"Failed to connect to {connection_name}: {e}")
raise
def disconnect_location(self, config: ConnectionConfig) -> None:
"""Disconnect from a VPN location.
Args:
config: Connection configuration
"""
connection_name = config.nmcli_connection_name or config.name
if not self.vpn_manager.connection_exists(connection_name):
logger.warning(f"Connection {connection_name} does not exist")
return
try:
logger.info(f"Disconnecting from {connection_name}")
self.vpn_manager.disconnect(connection_name)
logger.info(f"Successfully disconnected from {connection_name}")
except VPNConnectionError as e:
logger.error(f"Failed to disconnect from {connection_name}: {e}")
raise
def get_connection_status(self, config: ConnectionConfig) -> VPNStatus:
"""Get the status of a VPN connection.
Args:
config: Connection configuration
Returns:
Current VPN status
"""
connection_name = config.nmcli_connection_name or config.name
if not self.vpn_manager.connection_exists(connection_name):
return VPNStatus.DISCONNECTED
return self.vpn_manager.get_status(connection_name)
def _ensure_connection(self, config: ConnectionConfig) -> str:
"""Ensure VPN connection exists in NetworkManager.
Args:
config: Connection configuration
Returns:
Name of the NetworkManager connection
"""
connection_name = config.nmcli_connection_name or config.name
# Check if connection already exists
if self.vpn_manager.connection_exists(connection_name):
logger.debug(f"Connection {connection_name} already exists")
return connection_name
# Import if auto_import is enabled and config file exists
if config.auto_import and config.vpn_config_path:
vpn_file = Path(config.vpn_config_path)
if vpn_file.exists():
logger.info(f"Importing VPN configuration from {vpn_file}")
imported_name = self.vpn_manager.import_ovpn(
str(vpn_file),
connection_name
)
logger.info(f"Imported connection as {imported_name}")
return imported_name
else:
raise VPNConnectionError(
f"VPN config file not found: {config.vpn_config_path}"
)
raise VPNConnectionError(
f"Connection {connection_name} does not exist and auto-import is disabled"
)
def _get_credentials_from_config(self, config: ConnectionConfig) -> tuple[Optional[str], Optional[str]]:
"""Get credentials from the configuration.
Args:
config: Connection configuration
Returns:
Tuple of (username, password) or (None, None)
"""
if not config.vpn_credentials:
return None, None
# If it's a dict with username/password
if isinstance(config.vpn_credentials, dict):
username = config.vpn_credentials.get('username')
password = config.vpn_credentials.get('password')
return username, password
# If it's a string (Passbolt UUID for future use)
if isinstance(config.vpn_credentials, str):
# For now, try to use Passbolt if available
if self.passbolt_client:
try:
return self._get_passbolt_credentials(config.vpn_credentials)
except (PassboltError, ValueError) as e:
logger.warning(f"Failed to get Passbolt credentials: {e}")
else:
logger.warning(f"Passbolt UUID provided but Passbolt client not available")
return None, None
def _get_passbolt_credentials(self, resource_id: str) -> tuple[str, str]:
"""Get credentials from Passbolt.
Args:
resource_id: Passbolt resource UUID
Returns:
Tuple of (username, password)
"""
if not self.passbolt_client:
raise ValueError("Passbolt client not initialized")
try:
credential = self.passbolt_client.get_credential(resource_id)
if not credential.username or not credential.password:
raise ValueError(
f"Incomplete credentials for resource {resource_id}")
return credential.username, credential.password
except PassboltError as e:
logger.error(f"Failed to get Passbolt credentials: {e}")
raise
def validate_passbolt_resource(self, resource_id: str) -> bool:
"""Validate that a Passbolt resource exists and has required fields.
Args:
resource_id: Passbolt resource UUID
Returns:
True if resource is valid for VPN use
"""
if not self.passbolt_client:
return False
try:
credential = self.passbolt_client.get_credential(resource_id)
return bool(credential.username and credential.password)
except PassboltError:
return False
def import_all_configs(self, configs: list[ConnectionConfig]) -> Dict[str, bool]:
"""Import multiple VPN configurations.
Args:
configs: List of connection configurations
Returns:
Dictionary mapping connection names to success status
"""
results = {}
for config in configs:
try:
connection_name = self._ensure_connection(config)
results[connection_name] = True
logger.info(f"Successfully imported {connection_name}")
except VPNConnectionError as e:
results[config.name] = False
logger.error(f"Failed to import {config.name}: {e}")
return results
def cleanup_connection(self, config: ConnectionConfig,
remove_from_nm: bool = False) -> None:
"""Clean up a VPN connection.
Args:
config: Connection configuration
remove_from_nm: Whether to remove from NetworkManager
"""
connection_name = config.nmcli_connection_name or config.name
# Disconnect if connected
if self.get_connection_status(config) == VPNStatus.CONNECTED:
self.disconnect_location(config)
# Remove from NetworkManager if requested
if remove_from_nm and self.vpn_manager.connection_exists(connection_name):
logger.info(f"Removing {connection_name} from NetworkManager")
self.vpn_manager.delete_connection(connection_name)

View File

@@ -1,369 +0,0 @@
"""Passbolt CLI integration for secure credential management."""
import subprocess
import json
import os
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from enum import Enum
from pathlib import Path
class PassboltResourceType(Enum):
"""Types of resources in Passbolt."""
PASSWORD = "password"
PASSWORD_WITH_DESCRIPTION = "password-with-description"
PASSWORD_STRING = "password-string"
TOTP = "totp"
class PassboltError(Exception):
"""Exception raised for Passbolt operations."""
pass
@dataclass
class PassboltCredential:
"""Represents credentials retrieved from Passbolt."""
resource_id: str
name: str
username: Optional[str] = None
password: Optional[str] = None
uri: Optional[str] = None
description: Optional[str] = None
resource_type: PassboltResourceType = PassboltResourceType.PASSWORD
@dataclass
class PassboltResource:
"""Represents a Passbolt resource."""
id: str
name: str
username: Optional[str] = None
uri: Optional[str] = None
resource_type: str = "password"
folder_parent_id: Optional[str] = None
personal: bool = False
class PassboltClient:
"""Client for interacting with Passbolt through the CLI."""
def __init__(self, passbolt_cli_path: str = "passbolt"):
"""Initialize Passbolt client.
Args:
passbolt_cli_path: Path to the passbolt CLI executable
"""
self.cli_path = passbolt_cli_path
self._check_cli_available()
self._check_authentication()
def _check_cli_available(self) -> None:
"""Check if Passbolt CLI is available."""
try:
subprocess.run([self.cli_path, '--version'],
capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
raise PassboltError(
f"Passbolt CLI not found at '{self.cli_path}'. "
"Please install: https://github.com/passbolt/go-passbolt-cli"
)
def _check_authentication(self) -> None:
"""Check if authenticated with Passbolt."""
try:
# Try to list resources to check auth
self._run_passbolt(['list', '--json'], check=True)
except PassboltError:
raise PassboltError(
"Not authenticated with Passbolt. "
"Please run: passbolt auth login"
)
def _run_passbolt(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run a Passbolt CLI command.
Args:
args: Command arguments
check: Whether to check return code
Returns:
Completed process result
"""
try:
result = subprocess.run(
[self.cli_path] + args,
capture_output=True,
text=True,
check=check
)
return result
except subprocess.CalledProcessError as e:
raise PassboltError(f"Passbolt command failed: {e.stderr}")
def get_credential(self, resource_id: str) -> PassboltCredential:
"""Get a credential by resource ID.
Args:
resource_id: UUID of the Passbolt resource
Returns:
PassboltCredential object with username and password
"""
# Get the full resource
result = self._run_passbolt(['get', '--id', resource_id, '--json'])
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
raise PassboltError(f"Failed to parse Passbolt response")
# Extract fields based on resource type
credential = PassboltCredential(
resource_id=resource_id,
name=data.get('name', ''),
username=data.get('username'),
password=data.get('password'),
uri=data.get('uri'),
description=data.get('description')
)
# Determine resource type
if 'resource_type' in data:
try:
credential.resource_type = PassboltResourceType(
data['resource_type'])
except ValueError:
pass # Keep default
return credential
def get_field(self, resource_id: str, field: str) -> str:
"""Get a specific field from a resource.
Args:
resource_id: UUID of the Passbolt resource
field: Field name (e.g., 'password', 'username', 'uri')
Returns:
Field value as string
"""
result = self._run_passbolt(
['get', '--id', resource_id, '--field', field])
return result.stdout.strip()
def get_password(self, resource_id: str) -> str:
"""Get just the password for a resource.
Args:
resource_id: UUID of the Passbolt resource
Returns:
Password string
"""
return self.get_field(resource_id, 'password')
def get_username(self, resource_id: str) -> str:
"""Get just the username for a resource.
Args:
resource_id: UUID of the Passbolt resource
Returns:
Username string
"""
return self.get_field(resource_id, 'username')
def list_resources(self, folder_id: Optional[str] = None,
search: Optional[str] = None) -> List[PassboltResource]:
"""List available resources.
Args:
folder_id: Optional folder ID to filter by
search: Optional search term
Returns:
List of PassboltResource objects
"""
args = ['list', '--json']
if folder_id:
args.extend(['--folder', folder_id])
if search:
args.extend(['--filter', search])
result = self._run_passbolt(args)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return []
resources = []
for item in data:
resources.append(PassboltResource(
id=item['id'],
name=item.get('name', ''),
username=item.get('username'),
uri=item.get('uri'),
resource_type=item.get('resource_type', 'password'),
folder_parent_id=item.get('folder_parent_id'),
personal=item.get('personal', False)
))
return resources
def find_resource_by_name(self, name: str) -> Optional[PassboltResource]:
"""Find a resource by name.
Args:
name: Name of the resource to find
Returns:
First matching PassboltResource or None
"""
resources = self.list_resources(search=name)
for resource in resources:
if resource.name == name:
return resource
return None
def create_resource(self, name: str, username: str, password: str,
uri: Optional[str] = None,
description: Optional[str] = None,
folder_id: Optional[str] = None) -> str:
"""Create a new password resource.
Args:
name: Resource name
username: Username
password: Password
uri: Optional URI/URL
description: Optional description
folder_id: Optional folder to place resource in
Returns:
ID of created resource
"""
args = ['create', 'resource',
'--name', name,
'--username', username,
'--password', password]
if uri:
args.extend(['--uri', uri])
if description:
args.extend(['--description', description])
if folder_id:
args.extend(['--folder', folder_id])
result = self._run_passbolt(args)
# Parse the ID from output
# Output format: "Resource created: <id>"
for line in result.stdout.split('\n'):
if 'created' in line.lower() and ':' in line:
parts = line.split(':', 1)
if len(parts) == 2:
return parts[1].strip()
raise PassboltError("Failed to parse created resource ID")
def update_resource(self, resource_id: str,
name: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
uri: Optional[str] = None,
description: Optional[str] = None) -> None:
"""Update an existing resource.
Args:
resource_id: ID of resource to update
name: New name (optional)
username: New username (optional)
password: New password (optional)
uri: New URI (optional)
description: New description (optional)
"""
args = ['update', 'resource', '--id', resource_id]
if name:
args.extend(['--name', name])
if username:
args.extend(['--username', username])
if password:
args.extend(['--password', password])
if uri:
args.extend(['--uri', uri])
if description:
args.extend(['--description', description])
self._run_passbolt(args)
def delete_resource(self, resource_id: str) -> None:
"""Delete a resource.
Args:
resource_id: ID of resource to delete
"""
self._run_passbolt(['delete', 'resource', '--id', resource_id])
def share_resource(self, resource_id: str, user_id: str,
permission: str = "read") -> None:
"""Share a resource with another user.
Args:
resource_id: ID of resource to share
user_id: ID of user to share with
permission: Permission level ('read', 'update', 'owner')
"""
self._run_passbolt([
'share', 'resource',
'--id', resource_id,
'--user', user_id,
'--permission', permission
])
def list_folders(self) -> List[Dict[str, Any]]:
"""List all folders.
Returns:
List of folder dictionaries
"""
result = self._run_passbolt(['list', 'folder', '--json'])
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return []
def get_folder_by_name(self, name: str) -> Optional[Dict[str, Any]]:
"""Find a folder by name.
Args:
name: Folder name to search for
Returns:
Folder dictionary or None
"""
folders = self.list_folders()
for folder in folders:
if folder.get('name') == name:
return folder
return None
def validate_resource_id(self, resource_id: str) -> bool:
"""Check if a resource ID exists and is accessible.
Args:
resource_id: UUID of the resource
Returns:
True if resource exists and is accessible
"""
try:
self._run_passbolt(['get', '--id', resource_id, '--field', 'name'])
return True
except PassboltError:
return False

View File

@@ -1,13 +1,13 @@
"""VPN connection management using NetworkManager (nmcli)."""
"""Enhanced VPN management with VPNTray naming and route control."""
import subprocess
import tempfile
import os
import re
import logging
from dataclasses import dataclass
from typing import Optional, Dict, List
from typing import Optional, List
from enum import Enum
from pathlib import Path
from models import Location
class VPNStatus(Enum):
@@ -26,286 +26,482 @@ class VPNConnectionError(Exception):
@dataclass
class VPNConnection:
"""Represents a NetworkManager VPN connection."""
class VPNConnectionInfo:
"""Information about a VPN connection."""
name: str
uuid: str
type: str
vpntray_name: str # Our custom name with vpntray_ prefix
status: VPNStatus
device: Optional[str] = None
state: VPNStatus = VPNStatus.UNKNOWN
vpn_type: Optional[str] = None # OpenVPN, WireGuard, etc.
routes: List[str] = None # List of routes added
class VPNManager:
"""Manages VPN connections through NetworkManager CLI (nmcli)."""
"""Enhanced VPN manager with VPNTray naming and route management."""
VPNTRAY_PREFIX = "vpntray_"
VPN_CONFIG_DIR = Path.home() / ".vpntray" / "vpn"
def __init__(self):
"""Initialize VPN manager and check for nmcli availability."""
"""Initialize VPN manager."""
self.logger = logging.getLogger(__name__)
self._check_nmcli_available()
self._ensure_vpn_config_dir()
def _check_nmcli_available(self) -> None:
"""Check if nmcli is available on the system."""
"""Check if nmcli is available."""
try:
subprocess.run(['nmcli', '--version'],
capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
raise VPNConnectionError(
"nmcli is not available. Please install NetworkManager.")
"nmcli is not available. Install NetworkManager.")
def _ensure_vpn_config_dir(self) -> None:
"""Ensure VPN config directory exists."""
self.VPN_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
def _run_nmcli(self, args: List[str], check: bool = True, timeout: int = 30) -> subprocess.CompletedProcess:
"""Run nmcli command with logging and timeout."""
command = ['nmcli'] + args
command_str = ' '.join(command)
def _run_nmcli(self, args: List[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run an nmcli command with error handling."""
try:
result = subprocess.run(
['nmcli'] + args,
command,
capture_output=True,
text=True,
check=check
check=check,
timeout=timeout # Add timeout to prevent hanging
)
self.logger.debug(f"Command: {command_str}")
if result.stdout.strip():
self.logger.debug(f"Output: {result.stdout.strip()}")
if result.stderr.strip():
self.logger.warning(f"Stderr: {result.stderr.strip()}")
if result.returncode == 0:
self.logger.debug("Command completed successfully")
else:
self.logger.error(
f"Command exited with code: {result.returncode}")
return result
except subprocess.TimeoutExpired:
self.logger.error(
f"Command timed out after {timeout}s: {command_str}")
raise VPNConnectionError(
f"nmcli command timed out after {timeout} seconds")
except subprocess.CalledProcessError as e:
raise VPNConnectionError(f"nmcli command failed: {e.stderr}")
def import_ovpn(self, ovpn_path: str, connection_name: Optional[str] = None) -> str:
"""Import an OpenVPN configuration file.
Args:
ovpn_path: Path to the .ovpn configuration file
connection_name: Optional custom name for the connection
Returns:
The name of the imported connection
"""
ovpn_file = Path(ovpn_path)
if not ovpn_file.exists():
self.logger.debug(f"Failed command: {command_str}")
if e.stdout and e.stdout.strip():
self.logger.debug(f"Output: {e.stdout.strip()}")
if e.stderr and e.stderr.strip():
self.logger.error(f"Error: {e.stderr.strip()}")
error_details = e.stderr or str(e)
raise VPNConnectionError(
f"OpenVPN config file not found: {ovpn_path}")
f"nmcli command failed (exit code {e.returncode}): {error_details}")
# Import the configuration
result = self._run_nmcli([
'connection', 'import', 'type', 'openvpn', 'file', str(ovpn_file)
])
def _get_vpntray_connection_name(self, config_filename: str) -> str:
"""Generate VPNTray-specific connection name."""
# Remove extension and sanitize
base_name = Path(config_filename).stem
sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', base_name)
return f"{self.VPNTRAY_PREFIX}{sanitized}"
# Extract connection name from output
# nmcli typically outputs: "Connection 'name' (uuid) successfully added."
match = re.search(r"Connection '([^']+)'", result.stdout)
if not match:
raise VPNConnectionError(
"Failed to parse imported connection name")
imported_name = match.group(1)
# Rename if custom name provided
if connection_name and connection_name != imported_name:
self._run_nmcli([
'connection', 'modify', imported_name,
'connection.id', connection_name
])
return connection_name
return imported_name
def connect(self, connection_name: str,
username: Optional[str] = None,
password: Optional[str] = None) -> None:
"""Connect to a VPN.
Args:
connection_name: Name of the NetworkManager connection
username: Optional username for authentication
password: Optional password for authentication
"""
if username and password:
# Create temporary secrets file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(f"vpn.secrets.password:{password}\n")
if username:
f.write(f"vpn.data.username:{username}\n")
secrets_file = f.name
try:
self._run_nmcli([
'connection', 'up', connection_name,
'passwd-file', secrets_file
])
finally:
# Always clean up secrets file
os.unlink(secrets_file)
else:
# Connect without credentials (will prompt if needed)
self._run_nmcli(['connection', 'up', connection_name])
def disconnect(self, connection_name: str) -> None:
"""Disconnect from a VPN.
Args:
connection_name: Name of the NetworkManager connection
"""
self._run_nmcli(['connection', 'down', connection_name])
def get_status(self, connection_name: str) -> VPNStatus:
"""Get the status of a VPN connection.
Args:
connection_name: Name of the NetworkManager connection
Returns:
Current status of the VPN connection
"""
result = self._run_nmcli(
['connection', 'show', '--active'],
check=False
)
if connection_name in result.stdout:
# Parse the actual state
state_result = self._run_nmcli(
['connection', 'show', connection_name],
check=False
)
if 'GENERAL.STATE:' in state_result.stdout:
if 'activated' in state_result.stdout:
return VPNStatus.CONNECTED
elif 'activating' in state_result.stdout:
return VPNStatus.CONNECTING
elif 'deactivating' in state_result.stdout:
return VPNStatus.DISCONNECTING
return VPNStatus.DISCONNECTED
def list_connections(self, vpn_only: bool = True) -> List[VPNConnection]:
"""List all NetworkManager connections.
Args:
vpn_only: If True, only return VPN connections
Returns:
List of VPNConnection objects
"""
args = ['connection', 'show']
if vpn_only:
args.extend(['--type', 'vpn'])
result = self._run_nmcli(args, check=False)
def get_vpn_config_path(self, filename: str) -> Path:
"""Get full path to VPN config file."""
return self.VPN_CONFIG_DIR / filename
def list_vpntray_connections(self) -> List[VPNConnectionInfo]:
"""List all VPNTray-managed connections."""
connections = []
for line in result.stdout.strip().split('\n')[1:]: # Skip header
if not line:
continue
parts = line.split()
if len(parts) >= 4:
name = parts[0]
uuid = parts[1]
conn_type = parts[2]
device = parts[3] if parts[3] != '--' else None
try:
result = self._run_nmcli(['connection', 'show'])
for line in result.stdout.strip().split('\\n'):
if self.VPNTRAY_PREFIX in line:
parts = line.split()
if len(parts) >= 4:
name = parts[0]
uuid = parts[1]
device = parts[3] if parts[3] != '--' else None
# Get current status
status = self.get_status(name)
# Get detailed status
status = self._get_connection_status(name)
connections.append(VPNConnection(
name=name,
uuid=uuid,
type=conn_type,
device=device,
state=status
))
connections.append(VPNConnectionInfo(
name=name,
uuid=uuid,
vpntray_name=name,
status=status,
device=device
))
except VPNConnectionError:
pass # No connections or nmcli error
return connections
def delete_connection(self, connection_name: str) -> None:
"""Delete a NetworkManager connection.
def _get_connection_status(self, connection_name: str) -> VPNStatus:
"""Get the status of a specific connection."""
try:
result = self._run_nmcli(['connection', 'show', connection_name])
Args:
connection_name: Name of the connection to delete
"""
self._run_nmcli(['connection', 'delete', connection_name])
# Parse connection state from output
for line in result.stdout.split('\\n'):
if 'GENERAL.STATE:' in line:
state = line.split(':')[1].strip()
if 'activated' in state.lower():
return VPNStatus.CONNECTED
elif 'activating' in state.lower():
return VPNStatus.CONNECTING
elif 'deactivating' in state.lower():
return VPNStatus.DISCONNECTING
else:
return VPNStatus.DISCONNECTED
except VPNConnectionError:
pass
def connection_exists(self, connection_name: str) -> bool:
"""Check if a connection exists.
return VPNStatus.UNKNOWN
Args:
connection_name: Name of the connection to check
def import_vpn_config(self, location: Location) -> str:
"""Import VPN configuration for a location with VPNTray naming."""
config_path = self.get_vpn_config_path(location.vpn_config)
Returns:
True if the connection exists
"""
result = self._run_nmcli(
['connection', 'show', connection_name],
check=False
)
return result.returncode == 0
if not config_path.exists():
raise VPNConnectionError(f"VPN config not found: {config_path}")
def modify_connection(self, connection_name: str,
settings: Dict[str, str]) -> None:
"""Modify connection settings.
self.logger.info(
f"Config file exists: {config_path} ({config_path.stat().st_size} bytes)")
Args:
connection_name: Name of the connection to modify
settings: Dictionary of setting key-value pairs
e.g., {'vpn.data.comp-lzo': 'yes'}
"""
for key, value in settings.items():
vpntray_name = self._get_vpntray_connection_name(location.vpn_config)
# Check if already imported
if self._get_connection_by_name(vpntray_name):
self.logger.info(f"Connection already imported: {vpntray_name}")
return vpntray_name
# Import based on VPN type
self.logger.info(
f"Importing {location.vpn_type.value} config: {config_path.name}")
if location.vpn_type.value == "OpenVPN":
return self._import_openvpn(config_path, vpntray_name, location)
elif location.vpn_type.value == "WireGuard":
return self._import_wireguard(config_path, vpntray_name, location)
else:
raise VPNConnectionError(
f"Unsupported VPN type: {location.vpn_type.value}")
def _import_openvpn(self, config_path: Path, vpntray_name: str, location: Location) -> str:
"""Import OpenVPN configuration with route control."""
# Import the config file first (nmcli will auto-generate a name)
import_args = [
'connection', 'import', 'type', 'openvpn',
'file', str(config_path)
]
self.logger.info(f"Running nmcli import: {' '.join(import_args)}")
try:
result = self._run_nmcli(import_args)
# Extract the auto-generated connection name from the output
# nmcli outputs: "Connection 'name' (uuid) successfully added."
import re
match = re.search(r"Connection '([^']+)'", result.stdout)
if not match:
raise VPNConnectionError(
"Failed to parse imported connection name from nmcli output")
auto_generated_name = match.group(1)
self.logger.info(
f"Config imported with auto name: {auto_generated_name}")
# Rename to our VPNTray naming convention
rename_args = [
'connection', 'modify', auto_generated_name,
'connection.id', vpntray_name
]
self.logger.info(f"Renaming to: {vpntray_name}")
self._run_nmcli(rename_args)
self.logger.info(
f"OpenVPN config imported as {vpntray_name}")
except VPNConnectionError as e:
self.logger.error(f"OpenVPN import failed: {e}")
raise
# Configure credentials immediately after import if provided
if location.vpn_credentials:
self._configure_credentials(vpntray_name, location)
# Configure the connection to not route everything by default
self._configure_connection_routes(vpntray_name, location)
return vpntray_name
def _import_wireguard(self, config_path: Path, vpntray_name: str, location: Location) -> str:
"""Import WireGuard configuration with route control."""
# Import the config file first (nmcli will auto-generate a name)
import_args = [
'connection', 'import', 'type', 'wireguard',
'file', str(config_path)
]
self.logger.info(
f"Running nmcli import: {' '.join(import_args)}")
try:
result = self._run_nmcli(import_args)
# Extract the auto-generated connection name from the output
# nmcli outputs: "Connection 'name' (uuid) successfully added."
import re
match = re.search(r"Connection '([^']+)'", result.stdout)
if not match:
raise VPNConnectionError(
"Failed to parse imported connection name from nmcli output")
auto_generated_name = match.group(1)
self.logger.info(
f"Config imported with auto name: {auto_generated_name}")
# Rename to our VPNTray naming convention
rename_args = [
'connection', 'modify', auto_generated_name,
'connection.id', vpntray_name
]
self.logger.info(f"Renaming to: {vpntray_name}")
self._run_nmcli(rename_args)
self.logger.info(
f"WireGuard config imported as {vpntray_name}")
except VPNConnectionError as e:
self.logger.error(f"WireGuard import failed: {e}")
raise
# Configure credentials immediately after import if provided
if location.vpn_credentials:
self._configure_credentials(vpntray_name, location)
# Configure routes
self._configure_connection_routes(vpntray_name, location)
return vpntray_name
def _configure_connection_routes(self, connection_name: str, location: Location) -> None:
"""Configure connection to only route specified network segments."""
try:
# Disable automatic default route
self._run_nmcli([
'connection', 'modify', connection_name,
key, value
'ipv4.never-default', 'true'
])
def get_connection_details(self, connection_name: str) -> Dict[str, str]:
"""Get detailed information about a connection.
# Add routes for each network segment
routes = []
for segment in location.network_segments:
# Add route for the network segment
routes.append(segment.cidr)
Args:
connection_name: Name of the connection
if routes:
routes_str = ','.join(routes)
self._run_nmcli([
'connection', 'modify', connection_name,
'ipv4.routes', routes_str
])
self.logger.info(
f"Configured routes for {connection_name}: {routes_str}")
Returns:
Dictionary of connection properties
"""
result = self._run_nmcli(['connection', 'show', connection_name])
except VPNConnectionError as e:
self.logger.error(f"Failed to configure routes: {e}")
# Don't fail the import, just log the error
details = {}
for line in result.stdout.strip().split('\n'):
if ':' in line:
key, value = line.split(':', 1)
details[key.strip()] = value.strip()
return details
def get_active_vpn_interface(self, connection_name: str) -> Optional[str]:
"""Get the network interface used by an active VPN connection.
Args:
connection_name: Name of the VPN connection
Returns:
Interface name (e.g., 'tun0') or None if not connected
"""
if self.get_status(connection_name) != VPNStatus.CONNECTED:
def _get_connection_by_name(self, name: str) -> Optional[VPNConnectionInfo]:
"""Get connection info by name."""
try:
# Check if connection exists (simple and fast)
result = self._run_nmcli(['connection', 'show', name], check=False)
if result.returncode == 0:
# Connection exists, create minimal info object
return VPNConnectionInfo(
name=name,
uuid="unknown",
vpntray_name=name,
status=VPNStatus.UNKNOWN # Status will be checked when needed
)
return None
except VPNConnectionError:
return None
details = self.get_connection_details(connection_name)
return details.get('GENERAL.DEVICES')
def connect_vpn(self, location: Location) -> bool:
"""Connect to VPN for a location."""
try:
vpntray_name = self._get_vpntray_connection_name(
location.vpn_config)
config_path = self.get_vpn_config_path(location.vpn_config)
self.logger.info(f"VPN config: {config_path}")
self.logger.info(f"Connection name: {vpntray_name}")
def get_vpn_ip_address(self, connection_name: str) -> Optional[str]:
"""Get the IP address assigned to the VPN connection.
# Check if config file exists
if not config_path.exists():
error_msg = f"VPN config file not found: {config_path}"
self.logger.error(error_msg)
return False
Args:
connection_name: Name of the VPN connection
# Import if not already imported
existing_conn = self._get_connection_by_name(vpntray_name)
if not existing_conn:
self.logger.info(
"Importing VPN config for first time...")
try:
self.import_vpn_config(location)
self.logger.info(
"VPN config imported successfully")
except Exception as import_error:
error_msg = f"Failed to import VPN config: {import_error}"
self.logger.error(error_msg)
return False
else:
self.logger.info(
f"Using existing connection: {existing_conn.status.value}")
Returns:
IP address or None if not connected
"""
interface = self.get_active_vpn_interface(connection_name)
if not interface:
return None
# Connect with simple command - credentials already set during import
self.logger.info("Attempting connection...")
result = self._run_nmcli(['device', 'show', interface], check=False)
# Simple connection command without credential complications
connect_args = ['connection', 'up', vpntray_name]
self._run_nmcli(connect_args, timeout=60)
self.logger.info(f"Connected to {vpntray_name}")
for line in result.stdout.split('\n'):
if 'IP4.ADDRESS' in line and 'IP4.ADDRESS[2]' not in line:
# Format is usually "IP4.ADDRESS[1]: 10.0.0.1/24"
if ':' in line:
addr_part = line.split(':', 1)[1].strip()
if '/' in addr_part:
return addr_part.split('/')[0]
return True
return None
except VPNConnectionError as e:
self.logger.error(f"VPN connection failed: {e}")
return False
except Exception as e:
self.logger.error(
f"Unexpected error during connection: {e}")
return False
def disconnect_vpn(self, location: Location) -> bool:
"""Disconnect VPN for a location."""
try:
vpntray_name = self._get_vpntray_connection_name(
location.vpn_config)
self.logger.info(f"Disconnecting from {vpntray_name}...")
# Check if connection exists
existing_conn = self._get_connection_by_name(vpntray_name)
if not existing_conn:
self.logger.error(
f"Connection {vpntray_name} not found")
return False
# Disconnect
self._run_nmcli(['connection', 'down', vpntray_name])
self.logger.info(f"Disconnected from {vpntray_name}")
return True
except VPNConnectionError as e:
self.logger.error(f"Failed to disconnect: {e}")
return False
except Exception as e:
self.logger.error(
f"Unexpected error during disconnection: {e}")
return False
def get_connection_status(self, location: Location) -> VPNStatus:
"""Get connection status for a location."""
vpntray_name = self._get_vpntray_connection_name(location.vpn_config)
return self._get_connection_status(vpntray_name)
def remove_vpn_config(self, location: Location) -> bool:
"""Remove VPN connection configuration."""
try:
vpntray_name = self._get_vpntray_connection_name(
location.vpn_config)
# First disconnect if connected
try:
self._run_nmcli(
['connection', 'down', vpntray_name], check=False)
except VPNConnectionError:
pass # Ignore if already disconnected
# Remove the connection
self._run_nmcli(['connection', 'delete', vpntray_name])
self.logger.info(
f"Removed VPN configuration {vpntray_name}")
return True
except VPNConnectionError as e:
self.logger.error(f"Failed to remove config: {e}")
return False
def cleanup_vpntray_connections(self) -> int:
"""Remove all VPNTray-managed connections. Returns count removed."""
connections = self.list_vpntray_connections()
removed_count = 0
for conn in connections:
try:
# Disconnect first
self._run_nmcli(['connection', 'down', conn.name], check=False)
# Remove
self._run_nmcli(['connection', 'delete', conn.name])
removed_count += 1
except VPNConnectionError:
pass # Continue with other connections
if self.logger and removed_count > 0:
self.logger.info(
f"Cleaned up {removed_count} VPNTray connections")
return removed_count
def _configure_credentials(self, connection_name: str, location: Location) -> None:
"""Configure VPN credentials directly in the connection."""
if not location.vpn_credentials:
self.logger.info(
f"No credentials provided for {connection_name}")
return
try:
# Handle dictionary credentials (username/password)
if isinstance(location.vpn_credentials, dict):
username = location.vpn_credentials.get('username')
password = location.vpn_credentials.get('password')
self.logger.info(
f"Setting credentials for {connection_name}...")
# Set username and password with correct nmcli syntax
if username:
self._run_nmcli([
'connection', 'modify', connection_name,
'+vpn.data', f'username={username}'
])
self.logger.info(
f"Username configured for {connection_name}")
if password:
self._run_nmcli([
'connection', 'modify', connection_name,
'+vpn.secrets', f'password={password}'
])
self.logger.info(
f"Password configured for {connection_name}")
if username and password:
self.logger.info(
f"Full credentials configured for {connection_name}")
elif username or password:
self.logger.info(
f"Partial credentials configured for {connection_name}")
except VPNConnectionError as e:
self.logger.error(f"Failed to configure credentials: {e}")
# Don't fail the whole operation for credential issues