init
This commit is contained in:
5
src/utils/__init__.py
Normal file
5
src/utils/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .system_monitor import SystemMonitor
|
||||
from .gpu_monitor import GPUMonitor
|
||||
from .data_manager import data_manager
|
||||
|
||||
__all__ = ['SystemMonitor', 'GPUMonitor', 'data_manager']
|
||||
142
src/utils/data_manager.py
Normal file
142
src/utils/data_manager.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
from .system_monitor import SystemMonitor
|
||||
from .gpu_monitor import GPUMonitor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataManager:
|
||||
"""Global data manager that collects system information in the background"""
|
||||
|
||||
def __init__(self, update_interval: float = 1.0):
|
||||
self.update_interval = update_interval
|
||||
self.system_monitor = SystemMonitor()
|
||||
self.gpu_monitor = GPUMonitor()
|
||||
|
||||
self._data = {
|
||||
'system_info': {},
|
||||
'system_stats': {},
|
||||
'gpu_info': {},
|
||||
'gpu_stats': {},
|
||||
'last_update': 0
|
||||
}
|
||||
|
||||
self._running = False
|
||||
self._thread = None
|
||||
self._lock = threading.RLock()
|
||||
|
||||
def start(self):
|
||||
"""Start the background data collection"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._thread = threading.Thread(target=self._update_loop, daemon=True)
|
||||
self._thread.start()
|
||||
logger.info("DataManager started")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the background data collection"""
|
||||
self._running = False
|
||||
if self._thread and self._thread.is_alive():
|
||||
self._thread.join(timeout=5)
|
||||
logger.info("DataManager stopped")
|
||||
|
||||
def _update_loop(self):
|
||||
"""Background loop that updates system data"""
|
||||
while self._running:
|
||||
try:
|
||||
start_time = time.time()
|
||||
|
||||
# Collect system information
|
||||
with self._lock:
|
||||
# Static info (cached internally by monitors)
|
||||
self._data['system_info'] = self.system_monitor.get_system_info()
|
||||
self._data['gpu_info'] = self.gpu_monitor.get_gpu_info()
|
||||
|
||||
# Dynamic stats
|
||||
self._data['system_stats'] = self.system_monitor.get_system_stats()
|
||||
self._data['gpu_stats'] = self.gpu_monitor.get_primary_gpu_stats()
|
||||
self._data['last_update'] = time.time()
|
||||
|
||||
# Calculate sleep time to maintain consistent intervals
|
||||
elapsed = time.time() - start_time
|
||||
sleep_time = max(0, self.update_interval - elapsed)
|
||||
|
||||
if sleep_time > 0:
|
||||
time.sleep(sleep_time)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in data update loop: {e}")
|
||||
time.sleep(1) # Brief pause before retrying
|
||||
|
||||
def get_dashboard_data(self) -> Dict[str, Any]:
|
||||
"""Get all data needed for the dashboard"""
|
||||
with self._lock:
|
||||
stats = self._data['system_stats']
|
||||
gpu_stats = self._data['gpu_stats']
|
||||
|
||||
# Format data for dashboard consumption
|
||||
return {
|
||||
'cpu': {
|
||||
'percent': round(stats.get('cpu', {}).get('percent', 0), 1),
|
||||
'count': stats.get('cpu', {}).get('count', 0)
|
||||
},
|
||||
'memory': {
|
||||
'percent': round(stats.get('memory', {}).get('percent', 0), 1),
|
||||
'used_gb': round(stats.get('memory', {}).get('used', 0) / (1024**3), 1),
|
||||
'total_gb': round(stats.get('memory', {}).get('total', 0) / (1024**3), 1)
|
||||
},
|
||||
'gpu': {
|
||||
'percent': round(gpu_stats.get('usage', 0), 1),
|
||||
'temperature': round(gpu_stats.get('temperature', 0), 1),
|
||||
'available': gpu_stats.get('available', False)
|
||||
},
|
||||
'processes': {
|
||||
'count': stats.get('processes', {}).get('count', 0)
|
||||
},
|
||||
'disk': {
|
||||
'percent': round(stats.get('disk', {}).get('percent', 0), 1)
|
||||
},
|
||||
'network': {
|
||||
'bytes_sent': stats.get('network', {}).get('bytes_sent', 0),
|
||||
'bytes_recv': stats.get('network', {}).get('bytes_recv', 0)
|
||||
},
|
||||
'last_update': self._data['last_update']
|
||||
}
|
||||
|
||||
def get_system_info(self) -> Dict[str, Any]:
|
||||
"""Get static system information"""
|
||||
with self._lock:
|
||||
return self._data['system_info'].copy()
|
||||
|
||||
def get_system_stats(self) -> Dict[str, Any]:
|
||||
"""Get current system statistics"""
|
||||
with self._lock:
|
||||
return self._data['system_stats'].copy()
|
||||
|
||||
def get_gpu_info(self) -> Dict[str, Any]:
|
||||
"""Get static GPU information"""
|
||||
with self._lock:
|
||||
return self._data['gpu_info'].copy()
|
||||
|
||||
def get_gpu_stats(self) -> Dict[str, Any]:
|
||||
"""Get current GPU statistics"""
|
||||
with self._lock:
|
||||
return self._data['gpu_stats'].copy()
|
||||
|
||||
def get_processes(self, limit: int = 10) -> list:
|
||||
"""Get top processes (fetched on demand to avoid overhead)"""
|
||||
return self.system_monitor.get_processes(limit)
|
||||
|
||||
def format_bytes(self, bytes_value: int) -> str:
|
||||
"""Format bytes to human readable format"""
|
||||
return self.system_monitor.format_bytes(bytes_value)
|
||||
|
||||
|
||||
# Global instance
|
||||
data_manager = DataManager()
|
||||
308
src/utils/gpu_monitor.py
Normal file
308
src/utils/gpu_monitor.py
Normal file
@@ -0,0 +1,308 @@
|
||||
import subprocess
|
||||
import re
|
||||
import time
|
||||
import logging
|
||||
from typing import Dict, Any, Optional, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GPUMonitor:
|
||||
def __init__(self):
|
||||
self.last_update = None
|
||||
self.cache_duration = 2 # seconds
|
||||
self._cached_data = {}
|
||||
self.gpu_available = self._check_gpu_availability()
|
||||
|
||||
def _check_gpu_availability(self) -> bool:
|
||||
"""Check if AMD GPU monitoring tools are available"""
|
||||
try:
|
||||
# Check for rocm-smi (AMD)
|
||||
result = subprocess.run(['rocm-smi', '--help'],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
return True
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
try:
|
||||
# Check for radeontop
|
||||
result = subprocess.run(['radeontop', '--help'],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
return True
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
# Check for GPU in /sys/class/drm
|
||||
try:
|
||||
import os
|
||||
gpu_dirs = [d for d in os.listdir('/sys/class/drm') if d.startswith('card')]
|
||||
return len(gpu_dirs) > 0
|
||||
except:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def get_gpu_info(self) -> Dict[str, Any]:
|
||||
"""Get static GPU information"""
|
||||
if not self.gpu_available:
|
||||
return {'available': False, 'message': 'No GPU monitoring tools found'}
|
||||
|
||||
if not self._cached_data.get('gpu_info'):
|
||||
try:
|
||||
gpu_info = self._get_rocm_info()
|
||||
if not gpu_info:
|
||||
gpu_info = self._get_sys_gpu_info()
|
||||
|
||||
self._cached_data['gpu_info'] = gpu_info
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting GPU info: {e}")
|
||||
self._cached_data['gpu_info'] = {'available': False, 'error': str(e)}
|
||||
|
||||
return self._cached_data['gpu_info']
|
||||
|
||||
def get_gpu_stats(self) -> Dict[str, Any]:
|
||||
"""Get real-time GPU statistics"""
|
||||
if not self.gpu_available:
|
||||
return {'available': False}
|
||||
|
||||
now = time.time()
|
||||
if (self.last_update is None or
|
||||
now - self.last_update > self.cache_duration):
|
||||
|
||||
try:
|
||||
stats = self._get_rocm_stats()
|
||||
if not stats:
|
||||
stats = self._get_fallback_stats()
|
||||
|
||||
stats['timestamp'] = now
|
||||
self._cached_data['stats'] = stats
|
||||
self.last_update = now
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting GPU stats: {e}")
|
||||
self._cached_data['stats'] = {'available': False, 'error': str(e)}
|
||||
|
||||
return self._cached_data.get('stats', {'available': False})
|
||||
|
||||
def _get_rocm_info(self) -> Optional[Dict[str, Any]]:
|
||||
"""Get GPU info using rocm-smi"""
|
||||
try:
|
||||
result = subprocess.run(['rocm-smi', '--showid', '--showproductname'],
|
||||
capture_output=True, text=True, timeout=10)
|
||||
|
||||
if result.returncode == 0:
|
||||
lines = result.stdout.strip().split('\n')
|
||||
gpu_info = {'available': True, 'driver': 'rocm-smi', 'cards': []}
|
||||
|
||||
for line in lines:
|
||||
if 'GPU[' in line and ':' in line:
|
||||
# Parse GPU ID and name
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2:
|
||||
gpu_id = parts[0].strip()
|
||||
gpu_name = parts[1].strip()
|
||||
gpu_info['cards'].append({
|
||||
'id': gpu_id,
|
||||
'name': gpu_name
|
||||
})
|
||||
|
||||
return gpu_info if gpu_info['cards'] else None
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"rocm-smi not available: {e}")
|
||||
return None
|
||||
|
||||
def _get_rocm_stats(self) -> Optional[Dict[str, Any]]:
|
||||
"""Get GPU stats using rocm-smi"""
|
||||
try:
|
||||
# Get temperature, usage, and memory info
|
||||
result = subprocess.run(['rocm-smi', '--showtemp', '--showuse', '--showmeminfo'],
|
||||
capture_output=True, text=True, timeout=10)
|
||||
|
||||
if result.returncode == 0:
|
||||
stats = {'available': True, 'cards': []}
|
||||
|
||||
lines = result.stdout.strip().split('\n')
|
||||
current_gpu = None
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
|
||||
# Parse GPU identifier
|
||||
if line.startswith('GPU['):
|
||||
gpu_match = re.search(r'GPU\[(\d+)\]', line)
|
||||
if gpu_match:
|
||||
current_gpu = {
|
||||
'id': int(gpu_match.group(1)),
|
||||
'temperature': None,
|
||||
'usage': None,
|
||||
'memory_used': None,
|
||||
'memory_total': None
|
||||
}
|
||||
stats['cards'].append(current_gpu)
|
||||
|
||||
# Parse temperature
|
||||
elif 'Temperature' in line and current_gpu is not None:
|
||||
temp_match = re.search(r'(\d+\.\d+)°C', line)
|
||||
if temp_match:
|
||||
current_gpu['temperature'] = float(temp_match.group(1))
|
||||
|
||||
# Parse GPU usage
|
||||
elif 'GPU use' in line and current_gpu is not None:
|
||||
usage_match = re.search(r'(\d+)%', line)
|
||||
if usage_match:
|
||||
current_gpu['usage'] = int(usage_match.group(1))
|
||||
|
||||
# Parse memory info
|
||||
elif 'Memory' in line and current_gpu is not None:
|
||||
mem_match = re.search(r'(\d+)MB / (\d+)MB', line)
|
||||
if mem_match:
|
||||
current_gpu['memory_used'] = int(mem_match.group(1))
|
||||
current_gpu['memory_total'] = int(mem_match.group(2))
|
||||
|
||||
return stats if stats['cards'] else None
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"rocm-smi stats not available: {e}")
|
||||
return None
|
||||
|
||||
def _get_sys_gpu_info(self) -> Dict[str, Any]:
|
||||
"""Get GPU info from /sys filesystem"""
|
||||
try:
|
||||
import os
|
||||
gpu_info = {'available': True, 'driver': 'sysfs', 'cards': []}
|
||||
|
||||
drm_path = '/sys/class/drm'
|
||||
if os.path.exists(drm_path):
|
||||
card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')]
|
||||
|
||||
for card_dir in sorted(card_dirs):
|
||||
card_path = os.path.join(drm_path, card_dir)
|
||||
device_path = os.path.join(card_path, 'device')
|
||||
|
||||
gpu_name = "Unknown GPU"
|
||||
|
||||
# Try to get GPU name from various sources
|
||||
for name_file in ['product_name', 'device/product_name']:
|
||||
name_path = os.path.join(card_path, name_file)
|
||||
if os.path.exists(name_path):
|
||||
try:
|
||||
with open(name_path, 'r') as f:
|
||||
gpu_name = f.read().strip()
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
# Try vendor and device IDs
|
||||
if gpu_name == "Unknown GPU":
|
||||
try:
|
||||
vendor_path = os.path.join(device_path, 'vendor')
|
||||
device_id_path = os.path.join(device_path, 'device')
|
||||
|
||||
if os.path.exists(vendor_path) and os.path.exists(device_id_path):
|
||||
with open(vendor_path, 'r') as f:
|
||||
vendor = f.read().strip()
|
||||
with open(device_id_path, 'r') as f:
|
||||
device_id = f.read().strip()
|
||||
|
||||
if vendor == '0x1002': # AMD vendor ID
|
||||
gpu_name = f"AMD GPU ({device_id})"
|
||||
except:
|
||||
pass
|
||||
|
||||
gpu_info['cards'].append({
|
||||
'id': card_dir,
|
||||
'name': gpu_name,
|
||||
'path': card_path
|
||||
})
|
||||
|
||||
return gpu_info if gpu_info['cards'] else {'available': False}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sysfs GPU info: {e}")
|
||||
return {'available': False, 'error': str(e)}
|
||||
|
||||
def _get_fallback_stats(self) -> Dict[str, Any]:
|
||||
"""Get basic GPU stats from /sys filesystem"""
|
||||
try:
|
||||
import os
|
||||
stats = {'available': True, 'cards': []}
|
||||
|
||||
# Try to read basic info from sysfs
|
||||
drm_path = '/sys/class/drm'
|
||||
if os.path.exists(drm_path):
|
||||
card_dirs = [d for d in os.listdir(drm_path) if d.startswith('card') and not d.endswith('-')]
|
||||
|
||||
for card_dir in sorted(card_dirs):
|
||||
card_path = os.path.join(drm_path, card_dir)
|
||||
device_path = os.path.join(card_path, 'device')
|
||||
|
||||
gpu_stats = {
|
||||
'id': card_dir,
|
||||
'temperature': None,
|
||||
'usage': None,
|
||||
'memory_used': None,
|
||||
'memory_total': None
|
||||
}
|
||||
|
||||
# Try to read temperature from hwmon
|
||||
hwmon_path = os.path.join(device_path, 'hwmon')
|
||||
if os.path.exists(hwmon_path):
|
||||
for hwmon_dir in os.listdir(hwmon_path):
|
||||
temp_file = os.path.join(hwmon_path, hwmon_dir, 'temp1_input')
|
||||
if os.path.exists(temp_file):
|
||||
try:
|
||||
with open(temp_file, 'r') as f:
|
||||
temp_millicelsius = int(f.read().strip())
|
||||
gpu_stats['temperature'] = temp_millicelsius / 1000.0
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
# Try to read GPU usage (if available)
|
||||
gpu_busy_file = os.path.join(device_path, 'gpu_busy_percent')
|
||||
if os.path.exists(gpu_busy_file):
|
||||
try:
|
||||
with open(gpu_busy_file, 'r') as f:
|
||||
gpu_stats['usage'] = int(f.read().strip())
|
||||
except:
|
||||
pass
|
||||
|
||||
stats['cards'].append(gpu_stats)
|
||||
|
||||
return stats if stats['cards'] else {'available': False}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting fallback GPU stats: {e}")
|
||||
return {'available': False, 'error': str(e)}
|
||||
|
||||
def get_primary_gpu_stats(self) -> Dict[str, Any]:
|
||||
"""Get stats for the primary/first GPU"""
|
||||
all_stats = self.get_gpu_stats()
|
||||
|
||||
if not all_stats.get('available') or not all_stats.get('cards'):
|
||||
return {
|
||||
'available': False,
|
||||
'usage': 0,
|
||||
'temperature': 0,
|
||||
'memory_percent': 0
|
||||
}
|
||||
|
||||
primary_gpu = all_stats['cards'][0]
|
||||
|
||||
# Calculate memory percentage
|
||||
memory_percent = 0
|
||||
if (primary_gpu.get('memory_used') is not None and
|
||||
primary_gpu.get('memory_total') is not None and
|
||||
primary_gpu['memory_total'] > 0):
|
||||
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
|
||||
|
||||
return {
|
||||
'available': True,
|
||||
'usage': primary_gpu.get('usage', 0) or 0,
|
||||
'temperature': primary_gpu.get('temperature', 0) or 0,
|
||||
'memory_percent': memory_percent,
|
||||
'memory_used': primary_gpu.get('memory_used', 0) or 0,
|
||||
'memory_total': primary_gpu.get('memory_total', 0) or 0
|
||||
}
|
||||
187
src/utils/system_monitor.py
Normal file
187
src/utils/system_monitor.py
Normal file
@@ -0,0 +1,187 @@
|
||||
import psutil
|
||||
import platform
|
||||
import time
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SystemMonitor:
|
||||
def __init__(self):
|
||||
self.last_update = None
|
||||
self.cache_duration = 2 # seconds
|
||||
self._cached_data = {}
|
||||
|
||||
def get_system_info(self) -> Dict[str, Any]:
|
||||
"""Get static system information"""
|
||||
if not self._cached_data.get('system_info'):
|
||||
try:
|
||||
uname = platform.uname()
|
||||
boot_time = datetime.fromtimestamp(psutil.boot_time())
|
||||
uptime = datetime.now() - boot_time
|
||||
|
||||
self._cached_data['system_info'] = {
|
||||
'os': f"{uname.system}",
|
||||
'kernel': uname.release,
|
||||
'cpu': self._get_cpu_info(),
|
||||
'memory_total': psutil.virtual_memory().total,
|
||||
'uptime': self._format_uptime(uptime),
|
||||
'hostname': uname.node,
|
||||
'architecture': uname.machine
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting system info: {e}")
|
||||
self._cached_data['system_info'] = {}
|
||||
|
||||
return self._cached_data['system_info']
|
||||
|
||||
def get_system_stats(self) -> Dict[str, Any]:
|
||||
"""Get real-time system statistics"""
|
||||
now = time.time()
|
||||
if (self.last_update is None or
|
||||
now - self.last_update > self.cache_duration):
|
||||
|
||||
try:
|
||||
# CPU stats
|
||||
cpu_percent = psutil.cpu_percent(interval=0.1)
|
||||
cpu_count = psutil.cpu_count()
|
||||
cpu_freq = psutil.cpu_freq()
|
||||
|
||||
# Memory stats
|
||||
memory = psutil.virtual_memory()
|
||||
swap = psutil.swap_memory()
|
||||
|
||||
# Disk stats
|
||||
disk = psutil.disk_usage('/')
|
||||
|
||||
# Network stats
|
||||
network = psutil.net_io_counters()
|
||||
|
||||
# Process count
|
||||
process_count = len(psutil.pids())
|
||||
|
||||
# Load average (Unix only)
|
||||
load_avg = None
|
||||
try:
|
||||
load_avg = psutil.getloadavg()
|
||||
except AttributeError:
|
||||
# Windows doesn't have load average
|
||||
pass
|
||||
|
||||
# Temperature (if available)
|
||||
temperatures = self._get_temperatures()
|
||||
|
||||
self._cached_data['stats'] = {
|
||||
'cpu': {
|
||||
'percent': cpu_percent,
|
||||
'count': cpu_count,
|
||||
'frequency': cpu_freq.current if cpu_freq else None,
|
||||
'load_avg': load_avg
|
||||
},
|
||||
'memory': {
|
||||
'total': memory.total,
|
||||
'available': memory.available,
|
||||
'used': memory.used,
|
||||
'percent': memory.percent,
|
||||
'free': memory.free
|
||||
},
|
||||
'swap': {
|
||||
'total': swap.total,
|
||||
'used': swap.used,
|
||||
'percent': swap.percent
|
||||
},
|
||||
'disk': {
|
||||
'total': disk.total,
|
||||
'used': disk.used,
|
||||
'free': disk.free,
|
||||
'percent': (disk.used / disk.total) * 100
|
||||
},
|
||||
'network': {
|
||||
'bytes_sent': network.bytes_sent,
|
||||
'bytes_recv': network.bytes_recv,
|
||||
'packets_sent': network.packets_sent,
|
||||
'packets_recv': network.packets_recv
|
||||
},
|
||||
'processes': {
|
||||
'count': process_count
|
||||
},
|
||||
'temperatures': temperatures,
|
||||
'timestamp': now
|
||||
}
|
||||
|
||||
self.last_update = now
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting system stats: {e}")
|
||||
self._cached_data['stats'] = {}
|
||||
|
||||
return self._cached_data.get('stats', {})
|
||||
|
||||
def get_processes(self, limit: int = 10) -> list:
|
||||
"""Get top processes by CPU usage"""
|
||||
try:
|
||||
processes = []
|
||||
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'username']):
|
||||
try:
|
||||
processes.append(proc.info)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
# Sort by CPU usage
|
||||
processes.sort(key=lambda x: x.get('cpu_percent', 0), reverse=True)
|
||||
return processes[:limit]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting processes: {e}")
|
||||
return []
|
||||
|
||||
def _get_cpu_info(self) -> str:
|
||||
"""Get CPU model name"""
|
||||
try:
|
||||
with open('/proc/cpuinfo', 'r') as f:
|
||||
for line in f:
|
||||
if 'model name' in line:
|
||||
return line.split(':')[1].strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Fallback for non-Linux systems
|
||||
return f"{psutil.cpu_count()} CPU cores"
|
||||
|
||||
def _get_temperatures(self) -> Dict[str, float]:
|
||||
"""Get system temperatures if available"""
|
||||
try:
|
||||
temps = psutil.sensors_temperatures()
|
||||
result = {}
|
||||
|
||||
for name, entries in temps.items():
|
||||
for entry in entries:
|
||||
if entry.current:
|
||||
key = f"{name}_{entry.label}" if entry.label else name
|
||||
result[key] = entry.current
|
||||
|
||||
return result
|
||||
except:
|
||||
return {}
|
||||
|
||||
def _format_uptime(self, uptime: timedelta) -> str:
|
||||
"""Format uptime duration"""
|
||||
days = uptime.days
|
||||
hours, remainder = divmod(uptime.seconds, 3600)
|
||||
minutes, _ = divmod(remainder, 60)
|
||||
|
||||
if days > 0:
|
||||
return f"{days}d {hours}h {minutes}m"
|
||||
elif hours > 0:
|
||||
return f"{hours}h {minutes}m"
|
||||
else:
|
||||
return f"{minutes}m"
|
||||
|
||||
def format_bytes(self, bytes_value: int) -> str:
|
||||
"""Format bytes to human readable format"""
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||||
if bytes_value < 1024.0:
|
||||
return f"{bytes_value:.1f}{unit}"
|
||||
bytes_value /= 1024.0
|
||||
return f"{bytes_value:.1f}PB"
|
||||
Reference in New Issue
Block a user