gpu monitor class files splitted

This commit is contained in:
2025-09-25 13:26:30 +02:00
parent 50409c84eb
commit 2a2cd12f43
3 changed files with 532 additions and 515 deletions

View File

@@ -0,0 +1,309 @@
import subprocess
import re
import time
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class GPUAMDMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
self._cached_data = {}
self.gpu_available = self._check_gpu_availability()
def _check_gpu_availability(self) -> bool:
"""Check if AMD GPU monitoring tools are available"""
try:
# Check for rocm-smi (AMD)
result = subprocess.run(['rocm-smi', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
try:
# Check for radeontop
result = subprocess.run(['radeontop', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Check for GPU in /sys/class/drm
try:
import os
gpu_dirs = [d for d in os.listdir('/sys/class/drm') if d.startswith('card')]
return len(gpu_dirs) > 0
except:
pass
return False
def get_gpu_info(self) -> Dict[str, Any]:
"""Get static GPU information"""
if not self.gpu_available:
return {'available': False, 'message': 'No GPU monitoring tools found'}
if not self._cached_data.get('gpu_info'):
try:
gpu_info = self._get_rocm_info()
if not gpu_info:
gpu_info = self._get_sys_gpu_info()
self._cached_data['gpu_info'] = gpu_info
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
self._cached_data['gpu_info'] = {'available': False, 'error': str(e)}
return self._cached_data['gpu_info']
def get_gpu_stats(self) -> Dict[str, Any]:
"""Get real-time GPU statistics"""
if not self.gpu_available:
return {'available': False}
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
try:
stats = self._get_rocm_stats()
if not stats:
stats = self._get_fallback_stats()
stats['timestamp'] = now
self._cached_data['stats'] = stats
self.last_update = now
except Exception as e:
logger.error(f"Error getting GPU stats: {e}")
self._cached_data['stats'] = {'available': False, 'error': str(e)}
return self._cached_data.get('stats', {'available': False})
def _get_rocm_info(self) -> Optional[Dict[str, Any]]:
"""Get GPU info using rocm-smi"""
try:
result = subprocess.run(['rocm-smi', '--showid', '--showproductname'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
gpu_info = {'available': True, 'driver': 'rocm-smi', 'cards': []}
for line in lines:
if 'GPU[' in line and ':' in line:
# Parse GPU ID and name
parts = line.split(':')
if len(parts) >= 2:
gpu_id = parts[0].strip()
gpu_name = parts[1].strip()
gpu_info['cards'].append({
'id': gpu_id,
'name': gpu_name
})
return gpu_info if gpu_info['cards'] else None
except Exception as e:
logger.debug(f"rocm-smi not available: {e}")
return None
def _get_rocm_stats(self) -> Optional[Dict[str, Any]]:
"""Get GPU stats using rocm-smi"""
try:
# Get temperature, usage, and memory info
result = subprocess.run(['rocm-smi', '--showtemp', '--showuse', '--showmeminfo'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
stats = {'available': True, 'cards': []}
lines = result.stdout.strip().split('\n')
current_gpu = None
for line in lines:
line = line.strip()
# Parse GPU identifier
if line.startswith('GPU['):
gpu_match = re.search(r'GPU\[(\d+)\]', line)
if gpu_match:
current_gpu = {
'id': int(gpu_match.group(1)),
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None
}
stats['cards'].append(current_gpu)
# Parse temperature
elif 'Temperature' in line and current_gpu is not None:
temp_match = re.search(r'(\d+\.\d+)°C', line)
if temp_match:
current_gpu['temperature'] = float(temp_match.group(1))
# Parse GPU usage
elif 'GPU use' in line and current_gpu is not None:
usage_match = re.search(r'(\d+)%', line)
if usage_match:
current_gpu['usage'] = int(usage_match.group(1))
# Parse memory info
elif 'Memory' in line and current_gpu is not None:
mem_match = re.search(r'(\d+)MB / (\d+)MB', line)
if mem_match:
current_gpu['memory_used'] = int(mem_match.group(1))
current_gpu['memory_total'] = int(mem_match.group(2))
return stats if stats['cards'] else None
except Exception as e:
logger.debug(f"rocm-smi stats not available: {e}")
return None
def _get_sys_gpu_info(self) -> Dict[str, Any]:
"""Get GPU info from /sys filesystem"""
try:
import os
gpu_info = {'available': True, 'driver': 'sysfs', 'cards': []}
drm_path = '/sys/class/drm'
if os.path.exists(drm_path):
card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')]
for card_dir in sorted(card_dirs):
card_path = os.path.join(drm_path, card_dir)
device_path = os.path.join(card_path, 'device')
gpu_name = "Unknown GPU"
# Try to get GPU name from various sources
for name_file in ['product_name', 'device/product_name']:
name_path = os.path.join(card_path, name_file)
if os.path.exists(name_path):
try:
with open(name_path, 'r') as f:
gpu_name = f.read().strip()
break
except:
pass
# Try vendor and device IDs
if gpu_name == "Unknown GPU":
try:
vendor_path = os.path.join(device_path, 'vendor')
device_id_path = os.path.join(device_path, 'device')
if os.path.exists(vendor_path) and os.path.exists(device_id_path):
with open(vendor_path, 'r') as f:
vendor = f.read().strip()
with open(device_id_path, 'r') as f:
device_id = f.read().strip()
if vendor == '0x1002': # AMD vendor ID
gpu_name = f"AMD GPU ({device_id})"
except:
pass
gpu_info['cards'].append({
'id': card_dir,
'name': gpu_name,
'path': card_path
})
return gpu_info if gpu_info['cards'] else {'available': False}
except Exception as e:
logger.error(f"Error getting sysfs GPU info: {e}")
return {'available': False, 'error': str(e)}
def _get_fallback_stats(self) -> Dict[str, Any]:
"""Get basic GPU stats from /sys filesystem"""
try:
import os
stats = {'available': True, 'cards': []}
# Try to read basic info from sysfs
drm_path = '/sys/class/drm'
if os.path.exists(drm_path):
card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')]
for card_dir in sorted(card_dirs):
card_path = os.path.join(drm_path, card_dir)
device_path = os.path.join(card_path, 'device')
gpu_stats = {
'id': card_dir,
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None
}
# Try to read temperature from hwmon
hwmon_path = os.path.join(device_path, 'hwmon')
if os.path.exists(hwmon_path):
for hwmon_dir in os.listdir(hwmon_path):
temp_file = os.path.join(hwmon_path, hwmon_dir, 'temp1_input')
if os.path.exists(temp_file):
try:
with open(temp_file, 'r') as f:
temp_millicelsius = int(f.read().strip())
gpu_stats['temperature'] = temp_millicelsius / 1000.0
break
except:
pass
# Try to read GPU usage (if available)
gpu_busy_file = os.path.join(device_path, 'gpu_busy_percent')
if os.path.exists(gpu_busy_file):
try:
with open(gpu_busy_file, 'r') as f:
gpu_stats['usage'] = int(f.read().strip())
except:
pass
stats['cards'].append(gpu_stats)
return stats if stats['cards'] else {'available': False}
except Exception as e:
logger.error(f"Error getting fallback GPU stats: {e}")
return {'available': False, 'error': str(e)}
def get_primary_gpu_stats(self) -> Dict[str, Any]:
"""Get stats for the primary/first GPU"""
all_stats = self.get_gpu_stats()
if not all_stats.get('available') or not all_stats.get('cards'):
return {
'available': False,
'usage': 0,
'temperature': 0,
'memory_percent': 0
}
primary_gpu = all_stats['cards'][0]
# Calculate memory percentage
memory_percent = 0
if (primary_gpu.get('memory_used') is not None and
primary_gpu.get('memory_total') is not None and
primary_gpu['memory_total'] > 0):
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
return {
'available': True,
'usage': primary_gpu.get('usage', 0) or 0,
'temperature': primary_gpu.get('temperature', 0) or 0,
'memory_percent': memory_percent,
'memory_used': primary_gpu.get('memory_used', 0) or 0,
'memory_total': primary_gpu.get('memory_total', 0) or 0
}

View File

@@ -6,6 +6,8 @@ from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
from enum import Enum
from nicegui import binding
from gpu_amd_monitor import GPUAMDMonitor
from gpu_nvidia_monitor import GPUNVIDIAMonitor
logger = logging.getLogger(__name__)
@@ -139,522 +141,8 @@ class GPUMonitor:
logger.error(f"Error updating GPU stats: {e}")
class GPUAMDMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
self._cached_data = {}
self.gpu_available = self._check_gpu_availability()
def _check_gpu_availability(self) -> bool:
"""Check if AMD GPU monitoring tools are available"""
try:
# Check for rocm-smi (AMD)
result = subprocess.run(['rocm-smi', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
try:
# Check for radeontop
result = subprocess.run(['radeontop', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Check for GPU in /sys/class/drm
try:
import os
gpu_dirs = [d for d in os.listdir('/sys/class/drm') if d.startswith('card')]
return len(gpu_dirs) > 0
except:
pass
return False
def get_gpu_info(self) -> Dict[str, Any]:
"""Get static GPU information"""
if not self.gpu_available:
return {'available': False, 'message': 'No GPU monitoring tools found'}
if not self._cached_data.get('gpu_info'):
try:
gpu_info = self._get_rocm_info()
if not gpu_info:
gpu_info = self._get_sys_gpu_info()
self._cached_data['gpu_info'] = gpu_info
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
self._cached_data['gpu_info'] = {'available': False, 'error': str(e)}
return self._cached_data['gpu_info']
def get_gpu_stats(self) -> Dict[str, Any]:
"""Get real-time GPU statistics"""
if not self.gpu_available:
return {'available': False}
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
try:
stats = self._get_rocm_stats()
if not stats:
stats = self._get_fallback_stats()
stats['timestamp'] = now
self._cached_data['stats'] = stats
self.last_update = now
except Exception as e:
logger.error(f"Error getting GPU stats: {e}")
self._cached_data['stats'] = {'available': False, 'error': str(e)}
return self._cached_data.get('stats', {'available': False})
def _get_rocm_info(self) -> Optional[Dict[str, Any]]:
"""Get GPU info using rocm-smi"""
try:
result = subprocess.run(['rocm-smi', '--showid', '--showproductname'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
gpu_info = {'available': True, 'driver': 'rocm-smi', 'cards': []}
for line in lines:
if 'GPU[' in line and ':' in line:
# Parse GPU ID and name
parts = line.split(':')
if len(parts) >= 2:
gpu_id = parts[0].strip()
gpu_name = parts[1].strip()
gpu_info['cards'].append({
'id': gpu_id,
'name': gpu_name
})
return gpu_info if gpu_info['cards'] else None
except Exception as e:
logger.debug(f"rocm-smi not available: {e}")
return None
def _get_rocm_stats(self) -> Optional[Dict[str, Any]]:
"""Get GPU stats using rocm-smi"""
try:
# Get temperature, usage, and memory info
result = subprocess.run(['rocm-smi', '--showtemp', '--showuse', '--showmeminfo'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
stats = {'available': True, 'cards': []}
lines = result.stdout.strip().split('\n')
current_gpu = None
for line in lines:
line = line.strip()
# Parse GPU identifier
if line.startswith('GPU['):
gpu_match = re.search(r'GPU\[(\d+)\]', line)
if gpu_match:
current_gpu = {
'id': int(gpu_match.group(1)),
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None
}
stats['cards'].append(current_gpu)
# Parse temperature
elif 'Temperature' in line and current_gpu is not None:
temp_match = re.search(r'(\d+\.\d+)°C', line)
if temp_match:
current_gpu['temperature'] = float(temp_match.group(1))
# Parse GPU usage
elif 'GPU use' in line and current_gpu is not None:
usage_match = re.search(r'(\d+)%', line)
if usage_match:
current_gpu['usage'] = int(usage_match.group(1))
# Parse memory info
elif 'Memory' in line and current_gpu is not None:
mem_match = re.search(r'(\d+)MB / (\d+)MB', line)
if mem_match:
current_gpu['memory_used'] = int(mem_match.group(1))
current_gpu['memory_total'] = int(mem_match.group(2))
return stats if stats['cards'] else None
except Exception as e:
logger.debug(f"rocm-smi stats not available: {e}")
return None
def _get_sys_gpu_info(self) -> Dict[str, Any]:
"""Get GPU info from /sys filesystem"""
try:
import os
gpu_info = {'available': True, 'driver': 'sysfs', 'cards': []}
drm_path = '/sys/class/drm'
if os.path.exists(drm_path):
card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')]
for card_dir in sorted(card_dirs):
card_path = os.path.join(drm_path, card_dir)
device_path = os.path.join(card_path, 'device')
gpu_name = "Unknown GPU"
# Try to get GPU name from various sources
for name_file in ['product_name', 'device/product_name']:
name_path = os.path.join(card_path, name_file)
if os.path.exists(name_path):
try:
with open(name_path, 'r') as f:
gpu_name = f.read().strip()
break
except:
pass
# Try vendor and device IDs
if gpu_name == "Unknown GPU":
try:
vendor_path = os.path.join(device_path, 'vendor')
device_id_path = os.path.join(device_path, 'device')
if os.path.exists(vendor_path) and os.path.exists(device_id_path):
with open(vendor_path, 'r') as f:
vendor = f.read().strip()
with open(device_id_path, 'r') as f:
device_id = f.read().strip()
if vendor == '0x1002': # AMD vendor ID
gpu_name = f"AMD GPU ({device_id})"
except:
pass
gpu_info['cards'].append({
'id': card_dir,
'name': gpu_name,
'path': card_path
})
return gpu_info if gpu_info['cards'] else {'available': False}
except Exception as e:
logger.error(f"Error getting sysfs GPU info: {e}")
return {'available': False, 'error': str(e)}
def _get_fallback_stats(self) -> Dict[str, Any]:
"""Get basic GPU stats from /sys filesystem"""
try:
import os
stats = {'available': True, 'cards': []}
# Try to read basic info from sysfs
drm_path = '/sys/class/drm'
if os.path.exists(drm_path):
card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')]
for card_dir in sorted(card_dirs):
card_path = os.path.join(drm_path, card_dir)
device_path = os.path.join(card_path, 'device')
gpu_stats = {
'id': card_dir,
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None
}
# Try to read temperature from hwmon
hwmon_path = os.path.join(device_path, 'hwmon')
if os.path.exists(hwmon_path):
for hwmon_dir in os.listdir(hwmon_path):
temp_file = os.path.join(hwmon_path, hwmon_dir, 'temp1_input')
if os.path.exists(temp_file):
try:
with open(temp_file, 'r') as f:
temp_millicelsius = int(f.read().strip())
gpu_stats['temperature'] = temp_millicelsius / 1000.0
break
except:
pass
# Try to read GPU usage (if available)
gpu_busy_file = os.path.join(device_path, 'gpu_busy_percent')
if os.path.exists(gpu_busy_file):
try:
with open(gpu_busy_file, 'r') as f:
gpu_stats['usage'] = int(f.read().strip())
except:
pass
stats['cards'].append(gpu_stats)
return stats if stats['cards'] else {'available': False}
except Exception as e:
logger.error(f"Error getting fallback GPU stats: {e}")
return {'available': False, 'error': str(e)}
def get_primary_gpu_stats(self) -> Dict[str, Any]:
"""Get stats for the primary/first GPU"""
all_stats = self.get_gpu_stats()
if not all_stats.get('available') or not all_stats.get('cards'):
return {
'available': False,
'usage': 0,
'temperature': 0,
'memory_percent': 0
}
primary_gpu = all_stats['cards'][0]
# Calculate memory percentage
memory_percent = 0
if (primary_gpu.get('memory_used') is not None and
primary_gpu.get('memory_total') is not None and
primary_gpu['memory_total'] > 0):
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
return {
'available': True,
'usage': primary_gpu.get('usage', 0) or 0,
'temperature': primary_gpu.get('temperature', 0) or 0,
'memory_percent': memory_percent,
'memory_used': primary_gpu.get('memory_used', 0) or 0,
'memory_total': primary_gpu.get('memory_total', 0) or 0
}
class GPUNVIDIAMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
self._cached_data = {}
self.gpu_available = self._check_gpu_availability()
def _check_gpu_availability(self) -> bool:
"""Check if NVIDIA GPU monitoring tools are available"""
try:
# Check for nvidia-smi
result = subprocess.run(['nvidia-smi', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return False
def get_gpu_info(self) -> Dict[str, Any]:
"""Get static GPU information"""
if not self.gpu_available:
return {'available': False, 'message': 'No NVIDIA GPU monitoring tools found'}
if not self._cached_data.get('gpu_info'):
try:
gpu_info = self._get_nvidia_info()
self._cached_data['gpu_info'] = gpu_info
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
self._cached_data['gpu_info'] = {'available': False, 'error': str(e)}
return self._cached_data['gpu_info']
def get_gpu_stats(self) -> Dict[str, Any]:
"""Get real-time GPU statistics"""
if not self.gpu_available:
return {'available': False}
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
try:
stats = self._get_nvidia_stats()
stats['timestamp'] = now
self._cached_data['stats'] = stats
self.last_update = now
except Exception as e:
logger.error(f"Error getting GPU stats: {e}")
self._cached_data['stats'] = {'available': False, 'error': str(e)}
return self._cached_data.get('stats', {'available': False})
def _get_nvidia_info(self) -> Dict[str, Any]:
"""Get GPU info using nvidia-smi"""
try:
# Get GPU name, driver version, and CUDA version
result = subprocess.run(['nvidia-smi', '--query-gpu=index,name,driver_version',
'--format=csv,noheader'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
gpu_info = {'available': True, 'driver': 'nvidia-smi', 'cards': []}
# Get driver and CUDA version from general output
version_result = subprocess.run(['nvidia-smi'],
capture_output=True, text=True, timeout=10)
if version_result.returncode == 0:
# Parse driver version
driver_match = re.search(r'Driver Version:\s*(\S+)', version_result.stdout)
if driver_match:
gpu_info['driver_version'] = driver_match.group(1)
# Parse CUDA version
cuda_match = re.search(r'CUDA Version:\s*(\S+)', version_result.stdout)
if cuda_match:
gpu_info['cuda_version'] = cuda_match.group(1)
# Parse GPU info
lines = result.stdout.strip().split('\n')
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 3:
gpu_info['cards'].append({
'id': int(parts[0]),
'name': parts[1],
'driver_version': parts[2]
})
return gpu_info if gpu_info['cards'] else {'available': False}
return {'available': False}
except Exception as e:
logger.debug(f"nvidia-smi not available: {e}")
return {'available': False, 'error': str(e)}
def _get_nvidia_stats(self) -> Dict[str, Any]:
"""Get GPU stats using nvidia-smi"""
try:
# Query multiple metrics at once
result = subprocess.run([
'nvidia-smi',
'--query-gpu=index,temperature.gpu,utilization.gpu,memory.used,memory.total,power.draw,power.limit',
'--format=csv,noheader,nounits'
], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
stats = {'available': True, 'cards': []}
lines = result.stdout.strip().split('\n')
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 5:
gpu_stats = {
'id': int(parts[0]),
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None,
'power_draw': None,
'power_limit': None
}
# Parse temperature
if parts[1] and parts[1] != '[N/A]':
try:
gpu_stats['temperature'] = float(parts[1])
except ValueError:
pass
# Parse GPU utilization
if parts[2] and parts[2] != '[N/A]':
try:
gpu_stats['usage'] = int(parts[2])
except ValueError:
pass
# Parse memory usage
if parts[3] and parts[3] != '[N/A]':
try:
gpu_stats['memory_used'] = int(parts[3])
except ValueError:
pass
if parts[4] and parts[4] != '[N/A]':
try:
gpu_stats['memory_total'] = int(parts[4])
except ValueError:
pass
# Parse power stats if available
if len(parts) >= 7:
if parts[5] and parts[5] != '[N/A]':
try:
gpu_stats['power_draw'] = float(parts[5])
except ValueError:
pass
if parts[6] and parts[6] != '[N/A]':
try:
gpu_stats['power_limit'] = float(parts[6])
except ValueError:
pass
stats['cards'].append(gpu_stats)
return stats if stats['cards'] else {'available': False}
return {'available': False}
except Exception as e:
logger.debug(f"nvidia-smi stats not available: {e}")
return {'available': False, 'error': str(e)}
def get_primary_gpu_stats(self) -> Dict[str, Any]:
"""Get stats for the primary/first GPU"""
all_stats = self.get_gpu_stats()
if not all_stats.get('available') or not all_stats.get('cards'):
return {
'available': False,
'usage': 0,
'temperature': 0,
'memory_percent': 0,
'power_draw': 0,
'power_limit': 0
}
primary_gpu = all_stats['cards'][0]
# Calculate memory percentage
memory_percent = 0
if (primary_gpu.get('memory_used') is not None and
primary_gpu.get('memory_total') is not None and
primary_gpu['memory_total'] > 0):
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
return {
'available': True,
'usage': primary_gpu.get('usage', 0) or 0,
'temperature': primary_gpu.get('temperature', 0) or 0,
'memory_percent': memory_percent,
'memory_used': primary_gpu.get('memory_used', 0) or 0,
'memory_total': primary_gpu.get('memory_total', 0) or 0,
'power_draw': primary_gpu.get('power_draw', 0) or 0,
'power_limit': primary_gpu.get('power_limit', 0) or 0
}
if __name__ == "__main__":
monitor = GPUMonitor()
from pprint import pprint
print(monitor)
print(monitor.gpu_list)

View File

@@ -0,0 +1,220 @@
import subprocess
import re
import time
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
class GPUNVIDIAMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
self._cached_data = {}
self.gpu_available = self._check_gpu_availability()
def _check_gpu_availability(self) -> bool:
"""Check if NVIDIA GPU monitoring tools are available"""
try:
# Check for nvidia-smi
result = subprocess.run(['nvidia-smi', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return False
def get_gpu_info(self) -> Dict[str, Any]:
"""Get static GPU information"""
if not self.gpu_available:
return {'available': False, 'message': 'No NVIDIA GPU monitoring tools found'}
if not self._cached_data.get('gpu_info'):
try:
gpu_info = self._get_nvidia_info()
self._cached_data['gpu_info'] = gpu_info
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
self._cached_data['gpu_info'] = {'available': False, 'error': str(e)}
return self._cached_data['gpu_info']
def get_gpu_stats(self) -> Dict[str, Any]:
"""Get real-time GPU statistics"""
if not self.gpu_available:
return {'available': False}
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
try:
stats = self._get_nvidia_stats()
stats['timestamp'] = now
self._cached_data['stats'] = stats
self.last_update = now
except Exception as e:
logger.error(f"Error getting GPU stats: {e}")
self._cached_data['stats'] = {'available': False, 'error': str(e)}
return self._cached_data.get('stats', {'available': False})
def _get_nvidia_info(self) -> Dict[str, Any]:
"""Get GPU info using nvidia-smi"""
try:
# Get GPU name, driver version, and CUDA version
result = subprocess.run(['nvidia-smi', '--query-gpu=index,name,driver_version',
'--format=csv,noheader'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
gpu_info = {'available': True, 'driver': 'nvidia-smi', 'cards': []}
# Get driver and CUDA version from general output
version_result = subprocess.run(['nvidia-smi'],
capture_output=True, text=True, timeout=10)
if version_result.returncode == 0:
# Parse driver version
driver_match = re.search(r'Driver Version:\s*(\S+)', version_result.stdout)
if driver_match:
gpu_info['driver_version'] = driver_match.group(1)
# Parse CUDA version
cuda_match = re.search(r'CUDA Version:\s*(\S+)', version_result.stdout)
if cuda_match:
gpu_info['cuda_version'] = cuda_match.group(1)
# Parse GPU info
lines = result.stdout.strip().split('\n')
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 3:
gpu_info['cards'].append({
'id': int(parts[0]),
'name': parts[1],
'driver_version': parts[2]
})
return gpu_info if gpu_info['cards'] else {'available': False}
return {'available': False}
except Exception as e:
logger.debug(f"nvidia-smi not available: {e}")
return {'available': False, 'error': str(e)}
def _get_nvidia_stats(self) -> Dict[str, Any]:
"""Get GPU stats using nvidia-smi"""
try:
# Query multiple metrics at once
result = subprocess.run([
'nvidia-smi',
'--query-gpu=index,temperature.gpu,utilization.gpu,memory.used,memory.total,power.draw,power.limit',
'--format=csv,noheader,nounits'
], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
stats = {'available': True, 'cards': []}
lines = result.stdout.strip().split('\n')
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 5:
gpu_stats = {
'id': int(parts[0]),
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None,
'power_draw': None,
'power_limit': None
}
# Parse temperature
if parts[1] and parts[1] != '[N/A]':
try:
gpu_stats['temperature'] = float(parts[1])
except ValueError:
pass
# Parse GPU utilization
if parts[2] and parts[2] != '[N/A]':
try:
gpu_stats['usage'] = int(parts[2])
except ValueError:
pass
# Parse memory usage
if parts[3] and parts[3] != '[N/A]':
try:
gpu_stats['memory_used'] = int(parts[3])
except ValueError:
pass
if parts[4] and parts[4] != '[N/A]':
try:
gpu_stats['memory_total'] = int(parts[4])
except ValueError:
pass
# Parse power stats if available
if len(parts) >= 7:
if parts[5] and parts[5] != '[N/A]':
try:
gpu_stats['power_draw'] = float(parts[5])
except ValueError:
pass
if parts[6] and parts[6] != '[N/A]':
try:
gpu_stats['power_limit'] = float(parts[6])
except ValueError:
pass
stats['cards'].append(gpu_stats)
return stats if stats['cards'] else {'available': False}
return {'available': False}
except Exception as e:
logger.debug(f"nvidia-smi stats not available: {e}")
return {'available': False, 'error': str(e)}
def get_primary_gpu_stats(self) -> Dict[str, Any]:
"""Get stats for the primary/first GPU"""
all_stats = self.get_gpu_stats()
if not all_stats.get('available') or not all_stats.get('cards'):
return {
'available': False,
'usage': 0,
'temperature': 0,
'memory_percent': 0,
'power_draw': 0,
'power_limit': 0
}
primary_gpu = all_stats['cards'][0]
# Calculate memory percentage
memory_percent = 0
if (primary_gpu.get('memory_used') is not None and
primary_gpu.get('memory_total') is not None and
primary_gpu['memory_total'] > 0):
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
return {
'available': True,
'usage': primary_gpu.get('usage', 0) or 0,
'temperature': primary_gpu.get('temperature', 0) or 0,
'memory_percent': memory_percent,
'memory_used': primary_gpu.get('memory_used', 0) or 0,
'memory_total': primary_gpu.get('memory_total', 0) or 0,
'power_draw': primary_gpu.get('power_draw', 0) or 0,
'power_limit': primary_gpu.get('power_limit', 0) or 0
}