diff --git a/src/utils/gpu_amd_monitor.py b/src/utils/gpu_amd_monitor.py new file mode 100644 index 0000000..35184cc --- /dev/null +++ b/src/utils/gpu_amd_monitor.py @@ -0,0 +1,309 @@ +import subprocess +import re +import time +import logging +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class GPUAMDMonitor: + def __init__(self): + self.last_update = None + self.cache_duration = 2 # seconds + self._cached_data = {} + self.gpu_available = self._check_gpu_availability() + + def _check_gpu_availability(self) -> bool: + """Check if AMD GPU monitoring tools are available""" + try: + # Check for rocm-smi (AMD) + result = subprocess.run(['rocm-smi', '--help'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + return True + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + try: + # Check for radeontop + result = subprocess.run(['radeontop', '--help'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + return True + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Check for GPU in /sys/class/drm + try: + import os + gpu_dirs = [d for d in os.listdir('/sys/class/drm') if d.startswith('card')] + return len(gpu_dirs) > 0 + except: + pass + + return False + + def get_gpu_info(self) -> Dict[str, Any]: + """Get static GPU information""" + if not self.gpu_available: + return {'available': False, 'message': 'No GPU monitoring tools found'} + + if not self._cached_data.get('gpu_info'): + try: + gpu_info = self._get_rocm_info() + if not gpu_info: + gpu_info = self._get_sys_gpu_info() + + self._cached_data['gpu_info'] = gpu_info + except Exception as e: + logger.error(f"Error getting GPU info: {e}") + self._cached_data['gpu_info'] = {'available': False, 'error': str(e)} + + return self._cached_data['gpu_info'] + + def get_gpu_stats(self) -> Dict[str, Any]: + """Get real-time GPU statistics""" + if not self.gpu_available: + return {'available': False} + + now = time.time() + if (self.last_update is None or + now - self.last_update > self.cache_duration): + + try: + stats = self._get_rocm_stats() + if not stats: + stats = self._get_fallback_stats() + + stats['timestamp'] = now + self._cached_data['stats'] = stats + self.last_update = now + + except Exception as e: + logger.error(f"Error getting GPU stats: {e}") + self._cached_data['stats'] = {'available': False, 'error': str(e)} + + return self._cached_data.get('stats', {'available': False}) + + def _get_rocm_info(self) -> Optional[Dict[str, Any]]: + """Get GPU info using rocm-smi""" + try: + result = subprocess.run(['rocm-smi', '--showid', '--showproductname'], + capture_output=True, text=True, timeout=10) + + if result.returncode == 0: + lines = result.stdout.strip().split('\n') + gpu_info = {'available': True, 'driver': 'rocm-smi', 'cards': []} + + for line in lines: + if 'GPU[' in line and ':' in line: + # Parse GPU ID and name + parts = line.split(':') + if len(parts) >= 2: + gpu_id = parts[0].strip() + gpu_name = parts[1].strip() + gpu_info['cards'].append({ + 'id': gpu_id, + 'name': gpu_name + }) + + return gpu_info if gpu_info['cards'] else None + + except Exception as e: + logger.debug(f"rocm-smi not available: {e}") + return None + + def _get_rocm_stats(self) -> Optional[Dict[str, Any]]: + """Get GPU stats using rocm-smi""" + try: + # Get temperature, usage, and memory info + result = subprocess.run(['rocm-smi', '--showtemp', '--showuse', '--showmeminfo'], + capture_output=True, text=True, timeout=10) + + if result.returncode == 0: + stats = {'available': True, 'cards': []} + + lines = result.stdout.strip().split('\n') + current_gpu = None + + for line in lines: + line = line.strip() + + # Parse GPU identifier + if line.startswith('GPU['): + gpu_match = re.search(r'GPU\[(\d+)\]', line) + if gpu_match: + current_gpu = { + 'id': int(gpu_match.group(1)), + 'temperature': None, + 'usage': None, + 'memory_used': None, + 'memory_total': None + } + stats['cards'].append(current_gpu) + + # Parse temperature + elif 'Temperature' in line and current_gpu is not None: + temp_match = re.search(r'(\d+\.\d+)°C', line) + if temp_match: + current_gpu['temperature'] = float(temp_match.group(1)) + + # Parse GPU usage + elif 'GPU use' in line and current_gpu is not None: + usage_match = re.search(r'(\d+)%', line) + if usage_match: + current_gpu['usage'] = int(usage_match.group(1)) + + # Parse memory info + elif 'Memory' in line and current_gpu is not None: + mem_match = re.search(r'(\d+)MB / (\d+)MB', line) + if mem_match: + current_gpu['memory_used'] = int(mem_match.group(1)) + current_gpu['memory_total'] = int(mem_match.group(2)) + + return stats if stats['cards'] else None + + except Exception as e: + logger.debug(f"rocm-smi stats not available: {e}") + return None + + def _get_sys_gpu_info(self) -> Dict[str, Any]: + """Get GPU info from /sys filesystem""" + try: + import os + gpu_info = {'available': True, 'driver': 'sysfs', 'cards': []} + + drm_path = '/sys/class/drm' + if os.path.exists(drm_path): + card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')] + + for card_dir in sorted(card_dirs): + card_path = os.path.join(drm_path, card_dir) + device_path = os.path.join(card_path, 'device') + + gpu_name = "Unknown GPU" + + # Try to get GPU name from various sources + for name_file in ['product_name', 'device/product_name']: + name_path = os.path.join(card_path, name_file) + if os.path.exists(name_path): + try: + with open(name_path, 'r') as f: + gpu_name = f.read().strip() + break + except: + pass + + # Try vendor and device IDs + if gpu_name == "Unknown GPU": + try: + vendor_path = os.path.join(device_path, 'vendor') + device_id_path = os.path.join(device_path, 'device') + + if os.path.exists(vendor_path) and os.path.exists(device_id_path): + with open(vendor_path, 'r') as f: + vendor = f.read().strip() + with open(device_id_path, 'r') as f: + device_id = f.read().strip() + + if vendor == '0x1002': # AMD vendor ID + gpu_name = f"AMD GPU ({device_id})" + except: + pass + + gpu_info['cards'].append({ + 'id': card_dir, + 'name': gpu_name, + 'path': card_path + }) + + return gpu_info if gpu_info['cards'] else {'available': False} + + except Exception as e: + logger.error(f"Error getting sysfs GPU info: {e}") + return {'available': False, 'error': str(e)} + + def _get_fallback_stats(self) -> Dict[str, Any]: + """Get basic GPU stats from /sys filesystem""" + try: + import os + stats = {'available': True, 'cards': []} + + # Try to read basic info from sysfs + drm_path = '/sys/class/drm' + if os.path.exists(drm_path): + card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')] + + for card_dir in sorted(card_dirs): + card_path = os.path.join(drm_path, card_dir) + device_path = os.path.join(card_path, 'device') + + gpu_stats = { + 'id': card_dir, + 'temperature': None, + 'usage': None, + 'memory_used': None, + 'memory_total': None + } + + # Try to read temperature from hwmon + hwmon_path = os.path.join(device_path, 'hwmon') + if os.path.exists(hwmon_path): + for hwmon_dir in os.listdir(hwmon_path): + temp_file = os.path.join(hwmon_path, hwmon_dir, 'temp1_input') + if os.path.exists(temp_file): + try: + with open(temp_file, 'r') as f: + temp_millicelsius = int(f.read().strip()) + gpu_stats['temperature'] = temp_millicelsius / 1000.0 + break + except: + pass + + # Try to read GPU usage (if available) + gpu_busy_file = os.path.join(device_path, 'gpu_busy_percent') + if os.path.exists(gpu_busy_file): + try: + with open(gpu_busy_file, 'r') as f: + gpu_stats['usage'] = int(f.read().strip()) + except: + pass + + stats['cards'].append(gpu_stats) + + return stats if stats['cards'] else {'available': False} + + except Exception as e: + logger.error(f"Error getting fallback GPU stats: {e}") + return {'available': False, 'error': str(e)} + + def get_primary_gpu_stats(self) -> Dict[str, Any]: + """Get stats for the primary/first GPU""" + all_stats = self.get_gpu_stats() + + if not all_stats.get('available') or not all_stats.get('cards'): + return { + 'available': False, + 'usage': 0, + 'temperature': 0, + 'memory_percent': 0 + } + + primary_gpu = all_stats['cards'][0] + + # Calculate memory percentage + memory_percent = 0 + if (primary_gpu.get('memory_used') is not None and + primary_gpu.get('memory_total') is not None and + primary_gpu['memory_total'] > 0): + memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100 + + return { + 'available': True, + 'usage': primary_gpu.get('usage', 0) or 0, + 'temperature': primary_gpu.get('temperature', 0) or 0, + 'memory_percent': memory_percent, + 'memory_used': primary_gpu.get('memory_used', 0) or 0, + 'memory_total': primary_gpu.get('memory_total', 0) or 0 + } diff --git a/src/utils/gpu_monitor.py b/src/utils/gpu_monitor.py index 4dc600a..4042494 100644 --- a/src/utils/gpu_monitor.py +++ b/src/utils/gpu_monitor.py @@ -6,6 +6,8 @@ from dataclasses import dataclass, field from typing import Dict, Any, Optional, List from enum import Enum from nicegui import binding +from gpu_amd_monitor import GPUAMDMonitor +from gpu_nvidia_monitor import GPUNVIDIAMonitor logger = logging.getLogger(__name__) @@ -139,522 +141,8 @@ class GPUMonitor: logger.error(f"Error updating GPU stats: {e}") -class GPUAMDMonitor: - def __init__(self): - self.last_update = None - self.cache_duration = 2 # seconds - self._cached_data = {} - self.gpu_available = self._check_gpu_availability() - - def _check_gpu_availability(self) -> bool: - """Check if AMD GPU monitoring tools are available""" - try: - # Check for rocm-smi (AMD) - result = subprocess.run(['rocm-smi', '--help'], - capture_output=True, text=True, timeout=5) - if result.returncode == 0: - return True - except (subprocess.TimeoutExpired, FileNotFoundError): - pass - - try: - # Check for radeontop - result = subprocess.run(['radeontop', '--help'], - capture_output=True, text=True, timeout=5) - if result.returncode == 0: - return True - except (subprocess.TimeoutExpired, FileNotFoundError): - pass - - # Check for GPU in /sys/class/drm - try: - import os - gpu_dirs = [d for d in os.listdir('/sys/class/drm') if d.startswith('card')] - return len(gpu_dirs) > 0 - except: - pass - - return False - - def get_gpu_info(self) -> Dict[str, Any]: - """Get static GPU information""" - if not self.gpu_available: - return {'available': False, 'message': 'No GPU monitoring tools found'} - - if not self._cached_data.get('gpu_info'): - try: - gpu_info = self._get_rocm_info() - if not gpu_info: - gpu_info = self._get_sys_gpu_info() - - self._cached_data['gpu_info'] = gpu_info - except Exception as e: - logger.error(f"Error getting GPU info: {e}") - self._cached_data['gpu_info'] = {'available': False, 'error': str(e)} - - return self._cached_data['gpu_info'] - - def get_gpu_stats(self) -> Dict[str, Any]: - """Get real-time GPU statistics""" - if not self.gpu_available: - return {'available': False} - - now = time.time() - if (self.last_update is None or - now - self.last_update > self.cache_duration): - - try: - stats = self._get_rocm_stats() - if not stats: - stats = self._get_fallback_stats() - - stats['timestamp'] = now - self._cached_data['stats'] = stats - self.last_update = now - - except Exception as e: - logger.error(f"Error getting GPU stats: {e}") - self._cached_data['stats'] = {'available': False, 'error': str(e)} - - return self._cached_data.get('stats', {'available': False}) - - def _get_rocm_info(self) -> Optional[Dict[str, Any]]: - """Get GPU info using rocm-smi""" - try: - result = subprocess.run(['rocm-smi', '--showid', '--showproductname'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - lines = result.stdout.strip().split('\n') - gpu_info = {'available': True, 'driver': 'rocm-smi', 'cards': []} - - for line in lines: - if 'GPU[' in line and ':' in line: - # Parse GPU ID and name - parts = line.split(':') - if len(parts) >= 2: - gpu_id = parts[0].strip() - gpu_name = parts[1].strip() - gpu_info['cards'].append({ - 'id': gpu_id, - 'name': gpu_name - }) - - return gpu_info if gpu_info['cards'] else None - - except Exception as e: - logger.debug(f"rocm-smi not available: {e}") - return None - - def _get_rocm_stats(self) -> Optional[Dict[str, Any]]: - """Get GPU stats using rocm-smi""" - try: - # Get temperature, usage, and memory info - result = subprocess.run(['rocm-smi', '--showtemp', '--showuse', '--showmeminfo'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - stats = {'available': True, 'cards': []} - - lines = result.stdout.strip().split('\n') - current_gpu = None - - for line in lines: - line = line.strip() - - # Parse GPU identifier - if line.startswith('GPU['): - gpu_match = re.search(r'GPU\[(\d+)\]', line) - if gpu_match: - current_gpu = { - 'id': int(gpu_match.group(1)), - 'temperature': None, - 'usage': None, - 'memory_used': None, - 'memory_total': None - } - stats['cards'].append(current_gpu) - - # Parse temperature - elif 'Temperature' in line and current_gpu is not None: - temp_match = re.search(r'(\d+\.\d+)°C', line) - if temp_match: - current_gpu['temperature'] = float(temp_match.group(1)) - - # Parse GPU usage - elif 'GPU use' in line and current_gpu is not None: - usage_match = re.search(r'(\d+)%', line) - if usage_match: - current_gpu['usage'] = int(usage_match.group(1)) - - # Parse memory info - elif 'Memory' in line and current_gpu is not None: - mem_match = re.search(r'(\d+)MB / (\d+)MB', line) - if mem_match: - current_gpu['memory_used'] = int(mem_match.group(1)) - current_gpu['memory_total'] = int(mem_match.group(2)) - - return stats if stats['cards'] else None - - except Exception as e: - logger.debug(f"rocm-smi stats not available: {e}") - return None - - def _get_sys_gpu_info(self) -> Dict[str, Any]: - """Get GPU info from /sys filesystem""" - try: - import os - gpu_info = {'available': True, 'driver': 'sysfs', 'cards': []} - - drm_path = '/sys/class/drm' - if os.path.exists(drm_path): - card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')] - - for card_dir in sorted(card_dirs): - card_path = os.path.join(drm_path, card_dir) - device_path = os.path.join(card_path, 'device') - - gpu_name = "Unknown GPU" - - # Try to get GPU name from various sources - for name_file in ['product_name', 'device/product_name']: - name_path = os.path.join(card_path, name_file) - if os.path.exists(name_path): - try: - with open(name_path, 'r') as f: - gpu_name = f.read().strip() - break - except: - pass - - # Try vendor and device IDs - if gpu_name == "Unknown GPU": - try: - vendor_path = os.path.join(device_path, 'vendor') - device_id_path = os.path.join(device_path, 'device') - - if os.path.exists(vendor_path) and os.path.exists(device_id_path): - with open(vendor_path, 'r') as f: - vendor = f.read().strip() - with open(device_id_path, 'r') as f: - device_id = f.read().strip() - - if vendor == '0x1002': # AMD vendor ID - gpu_name = f"AMD GPU ({device_id})" - except: - pass - - gpu_info['cards'].append({ - 'id': card_dir, - 'name': gpu_name, - 'path': card_path - }) - - return gpu_info if gpu_info['cards'] else {'available': False} - - except Exception as e: - logger.error(f"Error getting sysfs GPU info: {e}") - return {'available': False, 'error': str(e)} - - def _get_fallback_stats(self) -> Dict[str, Any]: - """Get basic GPU stats from /sys filesystem""" - try: - import os - stats = {'available': True, 'cards': []} - - # Try to read basic info from sysfs - drm_path = '/sys/class/drm' - if os.path.exists(drm_path): - card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')] - - for card_dir in sorted(card_dirs): - card_path = os.path.join(drm_path, card_dir) - device_path = os.path.join(card_path, 'device') - - gpu_stats = { - 'id': card_dir, - 'temperature': None, - 'usage': None, - 'memory_used': None, - 'memory_total': None - } - - # Try to read temperature from hwmon - hwmon_path = os.path.join(device_path, 'hwmon') - if os.path.exists(hwmon_path): - for hwmon_dir in os.listdir(hwmon_path): - temp_file = os.path.join(hwmon_path, hwmon_dir, 'temp1_input') - if os.path.exists(temp_file): - try: - with open(temp_file, 'r') as f: - temp_millicelsius = int(f.read().strip()) - gpu_stats['temperature'] = temp_millicelsius / 1000.0 - break - except: - pass - - # Try to read GPU usage (if available) - gpu_busy_file = os.path.join(device_path, 'gpu_busy_percent') - if os.path.exists(gpu_busy_file): - try: - with open(gpu_busy_file, 'r') as f: - gpu_stats['usage'] = int(f.read().strip()) - except: - pass - - stats['cards'].append(gpu_stats) - - return stats if stats['cards'] else {'available': False} - - except Exception as e: - logger.error(f"Error getting fallback GPU stats: {e}") - return {'available': False, 'error': str(e)} - - def get_primary_gpu_stats(self) -> Dict[str, Any]: - """Get stats for the primary/first GPU""" - all_stats = self.get_gpu_stats() - - if not all_stats.get('available') or not all_stats.get('cards'): - return { - 'available': False, - 'usage': 0, - 'temperature': 0, - 'memory_percent': 0 - } - - primary_gpu = all_stats['cards'][0] - - # Calculate memory percentage - memory_percent = 0 - if (primary_gpu.get('memory_used') is not None and - primary_gpu.get('memory_total') is not None and - primary_gpu['memory_total'] > 0): - memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100 - - return { - 'available': True, - 'usage': primary_gpu.get('usage', 0) or 0, - 'temperature': primary_gpu.get('temperature', 0) or 0, - 'memory_percent': memory_percent, - 'memory_used': primary_gpu.get('memory_used', 0) or 0, - 'memory_total': primary_gpu.get('memory_total', 0) or 0 - } - - -class GPUNVIDIAMonitor: - def __init__(self): - self.last_update = None - self.cache_duration = 2 # seconds - self._cached_data = {} - self.gpu_available = self._check_gpu_availability() - - def _check_gpu_availability(self) -> bool: - """Check if NVIDIA GPU monitoring tools are available""" - try: - # Check for nvidia-smi - result = subprocess.run(['nvidia-smi', '--help'], - capture_output=True, text=True, timeout=5) - if result.returncode == 0: - return True - except (subprocess.TimeoutExpired, FileNotFoundError): - pass - - return False - - def get_gpu_info(self) -> Dict[str, Any]: - """Get static GPU information""" - if not self.gpu_available: - return {'available': False, 'message': 'No NVIDIA GPU monitoring tools found'} - - if not self._cached_data.get('gpu_info'): - try: - gpu_info = self._get_nvidia_info() - self._cached_data['gpu_info'] = gpu_info - except Exception as e: - logger.error(f"Error getting GPU info: {e}") - self._cached_data['gpu_info'] = {'available': False, 'error': str(e)} - - return self._cached_data['gpu_info'] - - def get_gpu_stats(self) -> Dict[str, Any]: - """Get real-time GPU statistics""" - if not self.gpu_available: - return {'available': False} - - now = time.time() - if (self.last_update is None or - now - self.last_update > self.cache_duration): - - try: - stats = self._get_nvidia_stats() - stats['timestamp'] = now - self._cached_data['stats'] = stats - self.last_update = now - - except Exception as e: - logger.error(f"Error getting GPU stats: {e}") - self._cached_data['stats'] = {'available': False, 'error': str(e)} - - return self._cached_data.get('stats', {'available': False}) - - def _get_nvidia_info(self) -> Dict[str, Any]: - """Get GPU info using nvidia-smi""" - try: - # Get GPU name, driver version, and CUDA version - result = subprocess.run(['nvidia-smi', '--query-gpu=index,name,driver_version', - '--format=csv,noheader'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - gpu_info = {'available': True, 'driver': 'nvidia-smi', 'cards': []} - - # Get driver and CUDA version from general output - version_result = subprocess.run(['nvidia-smi'], - capture_output=True, text=True, timeout=10) - if version_result.returncode == 0: - # Parse driver version - driver_match = re.search(r'Driver Version:\s*(\S+)', version_result.stdout) - if driver_match: - gpu_info['driver_version'] = driver_match.group(1) - - # Parse CUDA version - cuda_match = re.search(r'CUDA Version:\s*(\S+)', version_result.stdout) - if cuda_match: - gpu_info['cuda_version'] = cuda_match.group(1) - - # Parse GPU info - lines = result.stdout.strip().split('\n') - for line in lines: - parts = [p.strip() for p in line.split(',')] - if len(parts) >= 3: - gpu_info['cards'].append({ - 'id': int(parts[0]), - 'name': parts[1], - 'driver_version': parts[2] - }) - - return gpu_info if gpu_info['cards'] else {'available': False} - - return {'available': False} - - except Exception as e: - logger.debug(f"nvidia-smi not available: {e}") - return {'available': False, 'error': str(e)} - - def _get_nvidia_stats(self) -> Dict[str, Any]: - """Get GPU stats using nvidia-smi""" - try: - # Query multiple metrics at once - result = subprocess.run([ - 'nvidia-smi', - '--query-gpu=index,temperature.gpu,utilization.gpu,memory.used,memory.total,power.draw,power.limit', - '--format=csv,noheader,nounits' - ], capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - stats = {'available': True, 'cards': []} - - lines = result.stdout.strip().split('\n') - for line in lines: - parts = [p.strip() for p in line.split(',')] - if len(parts) >= 5: - gpu_stats = { - 'id': int(parts[0]), - 'temperature': None, - 'usage': None, - 'memory_used': None, - 'memory_total': None, - 'power_draw': None, - 'power_limit': None - } - - # Parse temperature - if parts[1] and parts[1] != '[N/A]': - try: - gpu_stats['temperature'] = float(parts[1]) - except ValueError: - pass - - # Parse GPU utilization - if parts[2] and parts[2] != '[N/A]': - try: - gpu_stats['usage'] = int(parts[2]) - except ValueError: - pass - - # Parse memory usage - if parts[3] and parts[3] != '[N/A]': - try: - gpu_stats['memory_used'] = int(parts[3]) - except ValueError: - pass - - if parts[4] and parts[4] != '[N/A]': - try: - gpu_stats['memory_total'] = int(parts[4]) - except ValueError: - pass - - # Parse power stats if available - if len(parts) >= 7: - if parts[5] and parts[5] != '[N/A]': - try: - gpu_stats['power_draw'] = float(parts[5]) - except ValueError: - pass - - if parts[6] and parts[6] != '[N/A]': - try: - gpu_stats['power_limit'] = float(parts[6]) - except ValueError: - pass - - stats['cards'].append(gpu_stats) - - return stats if stats['cards'] else {'available': False} - - return {'available': False} - - except Exception as e: - logger.debug(f"nvidia-smi stats not available: {e}") - return {'available': False, 'error': str(e)} - - def get_primary_gpu_stats(self) -> Dict[str, Any]: - """Get stats for the primary/first GPU""" - all_stats = self.get_gpu_stats() - - if not all_stats.get('available') or not all_stats.get('cards'): - return { - 'available': False, - 'usage': 0, - 'temperature': 0, - 'memory_percent': 0, - 'power_draw': 0, - 'power_limit': 0 - } - - primary_gpu = all_stats['cards'][0] - - # Calculate memory percentage - memory_percent = 0 - if (primary_gpu.get('memory_used') is not None and - primary_gpu.get('memory_total') is not None and - primary_gpu['memory_total'] > 0): - memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100 - - return { - 'available': True, - 'usage': primary_gpu.get('usage', 0) or 0, - 'temperature': primary_gpu.get('temperature', 0) or 0, - 'memory_percent': memory_percent, - 'memory_used': primary_gpu.get('memory_used', 0) or 0, - 'memory_total': primary_gpu.get('memory_total', 0) or 0, - 'power_draw': primary_gpu.get('power_draw', 0) or 0, - 'power_limit': primary_gpu.get('power_limit', 0) or 0 - } - - if __name__ == "__main__": monitor = GPUMonitor() from pprint import pprint print(monitor) + print(monitor.gpu_list) diff --git a/src/utils/gpu_nvidia_monitor.py b/src/utils/gpu_nvidia_monitor.py new file mode 100644 index 0000000..b980cd4 --- /dev/null +++ b/src/utils/gpu_nvidia_monitor.py @@ -0,0 +1,220 @@ +import subprocess +import re +import time +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + + +class GPUNVIDIAMonitor: + def __init__(self): + self.last_update = None + self.cache_duration = 2 # seconds + self._cached_data = {} + self.gpu_available = self._check_gpu_availability() + + def _check_gpu_availability(self) -> bool: + """Check if NVIDIA GPU monitoring tools are available""" + try: + # Check for nvidia-smi + result = subprocess.run(['nvidia-smi', '--help'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + return True + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return False + + def get_gpu_info(self) -> Dict[str, Any]: + """Get static GPU information""" + if not self.gpu_available: + return {'available': False, 'message': 'No NVIDIA GPU monitoring tools found'} + + if not self._cached_data.get('gpu_info'): + try: + gpu_info = self._get_nvidia_info() + self._cached_data['gpu_info'] = gpu_info + except Exception as e: + logger.error(f"Error getting GPU info: {e}") + self._cached_data['gpu_info'] = {'available': False, 'error': str(e)} + + return self._cached_data['gpu_info'] + + def get_gpu_stats(self) -> Dict[str, Any]: + """Get real-time GPU statistics""" + if not self.gpu_available: + return {'available': False} + + now = time.time() + if (self.last_update is None or + now - self.last_update > self.cache_duration): + + try: + stats = self._get_nvidia_stats() + stats['timestamp'] = now + self._cached_data['stats'] = stats + self.last_update = now + + except Exception as e: + logger.error(f"Error getting GPU stats: {e}") + self._cached_data['stats'] = {'available': False, 'error': str(e)} + + return self._cached_data.get('stats', {'available': False}) + + def _get_nvidia_info(self) -> Dict[str, Any]: + """Get GPU info using nvidia-smi""" + try: + # Get GPU name, driver version, and CUDA version + result = subprocess.run(['nvidia-smi', '--query-gpu=index,name,driver_version', + '--format=csv,noheader'], + capture_output=True, text=True, timeout=10) + + if result.returncode == 0: + gpu_info = {'available': True, 'driver': 'nvidia-smi', 'cards': []} + + # Get driver and CUDA version from general output + version_result = subprocess.run(['nvidia-smi'], + capture_output=True, text=True, timeout=10) + if version_result.returncode == 0: + # Parse driver version + driver_match = re.search(r'Driver Version:\s*(\S+)', version_result.stdout) + if driver_match: + gpu_info['driver_version'] = driver_match.group(1) + + # Parse CUDA version + cuda_match = re.search(r'CUDA Version:\s*(\S+)', version_result.stdout) + if cuda_match: + gpu_info['cuda_version'] = cuda_match.group(1) + + # Parse GPU info + lines = result.stdout.strip().split('\n') + for line in lines: + parts = [p.strip() for p in line.split(',')] + if len(parts) >= 3: + gpu_info['cards'].append({ + 'id': int(parts[0]), + 'name': parts[1], + 'driver_version': parts[2] + }) + + return gpu_info if gpu_info['cards'] else {'available': False} + + return {'available': False} + + except Exception as e: + logger.debug(f"nvidia-smi not available: {e}") + return {'available': False, 'error': str(e)} + + def _get_nvidia_stats(self) -> Dict[str, Any]: + """Get GPU stats using nvidia-smi""" + try: + # Query multiple metrics at once + result = subprocess.run([ + 'nvidia-smi', + '--query-gpu=index,temperature.gpu,utilization.gpu,memory.used,memory.total,power.draw,power.limit', + '--format=csv,noheader,nounits' + ], capture_output=True, text=True, timeout=10) + + if result.returncode == 0: + stats = {'available': True, 'cards': []} + + lines = result.stdout.strip().split('\n') + for line in lines: + parts = [p.strip() for p in line.split(',')] + if len(parts) >= 5: + gpu_stats = { + 'id': int(parts[0]), + 'temperature': None, + 'usage': None, + 'memory_used': None, + 'memory_total': None, + 'power_draw': None, + 'power_limit': None + } + + # Parse temperature + if parts[1] and parts[1] != '[N/A]': + try: + gpu_stats['temperature'] = float(parts[1]) + except ValueError: + pass + + # Parse GPU utilization + if parts[2] and parts[2] != '[N/A]': + try: + gpu_stats['usage'] = int(parts[2]) + except ValueError: + pass + + # Parse memory usage + if parts[3] and parts[3] != '[N/A]': + try: + gpu_stats['memory_used'] = int(parts[3]) + except ValueError: + pass + + if parts[4] and parts[4] != '[N/A]': + try: + gpu_stats['memory_total'] = int(parts[4]) + except ValueError: + pass + + # Parse power stats if available + if len(parts) >= 7: + if parts[5] and parts[5] != '[N/A]': + try: + gpu_stats['power_draw'] = float(parts[5]) + except ValueError: + pass + + if parts[6] and parts[6] != '[N/A]': + try: + gpu_stats['power_limit'] = float(parts[6]) + except ValueError: + pass + + stats['cards'].append(gpu_stats) + + return stats if stats['cards'] else {'available': False} + + return {'available': False} + + except Exception as e: + logger.debug(f"nvidia-smi stats not available: {e}") + return {'available': False, 'error': str(e)} + + def get_primary_gpu_stats(self) -> Dict[str, Any]: + """Get stats for the primary/first GPU""" + all_stats = self.get_gpu_stats() + + if not all_stats.get('available') or not all_stats.get('cards'): + return { + 'available': False, + 'usage': 0, + 'temperature': 0, + 'memory_percent': 0, + 'power_draw': 0, + 'power_limit': 0 + } + + primary_gpu = all_stats['cards'][0] + + # Calculate memory percentage + memory_percent = 0 + if (primary_gpu.get('memory_used') is not None and + primary_gpu.get('memory_total') is not None and + primary_gpu['memory_total'] > 0): + memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100 + + return { + 'available': True, + 'usage': primary_gpu.get('usage', 0) or 0, + 'temperature': primary_gpu.get('temperature', 0) or 0, + 'memory_percent': memory_percent, + 'memory_used': primary_gpu.get('memory_used', 0) or 0, + 'memory_total': primary_gpu.get('memory_total', 0) or 0, + 'power_draw': primary_gpu.get('power_draw', 0) or 0, + 'power_limit': primary_gpu.get('power_limit', 0) or 0 + }