This commit is contained in:
2025-09-18 03:41:48 +02:00
parent 7af7ba28a0
commit 8c6f070ea1
14 changed files with 1045 additions and 406 deletions

View File

@@ -1,5 +1,4 @@
from .system_monitor import SystemMonitor
from .gpu_monitor import GPUMonitor
from .data_manager import data_manager
__all__ = ['SystemMonitor', 'GPUMonitor', 'data_manager']
__all__ = ['SystemMonitor', 'GPUMonitor']

View File

@@ -1,142 +0,0 @@
import asyncio
import logging
import threading
import time
from typing import Dict, Any
from .system_monitor import SystemMonitor
from .gpu_monitor import GPUMonitor
logger = logging.getLogger(__name__)
class DataManager:
"""Global data manager that collects system information in the background"""
def __init__(self, update_interval: float = 1.0):
self.update_interval = update_interval
self.system_monitor = SystemMonitor()
self.gpu_monitor = GPUMonitor()
self._data = {
'system_info': {},
'system_stats': {},
'gpu_info': {},
'gpu_stats': {},
'last_update': 0
}
self._running = False
self._thread = None
self._lock = threading.RLock()
def start(self):
"""Start the background data collection"""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._update_loop, daemon=True)
self._thread.start()
logger.info("DataManager started")
def stop(self):
"""Stop the background data collection"""
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
logger.info("DataManager stopped")
def _update_loop(self):
"""Background loop that updates system data"""
while self._running:
try:
start_time = time.time()
# Collect system information
with self._lock:
# Static info (cached internally by monitors)
self._data['system_info'] = self.system_monitor.get_system_info()
self._data['gpu_info'] = self.gpu_monitor.get_gpu_info()
# Dynamic stats
self._data['system_stats'] = self.system_monitor.get_system_stats()
self._data['gpu_stats'] = self.gpu_monitor.get_primary_gpu_stats()
self._data['last_update'] = time.time()
# Calculate sleep time to maintain consistent intervals
elapsed = time.time() - start_time
sleep_time = max(0, self.update_interval - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Error in data update loop: {e}")
time.sleep(1) # Brief pause before retrying
def get_dashboard_data(self) -> Dict[str, Any]:
"""Get all data needed for the dashboard"""
with self._lock:
stats = self._data['system_stats']
gpu_stats = self._data['gpu_stats']
# Format data for dashboard consumption
return {
'cpu': {
'percent': round(stats.get('cpu', {}).get('percent', 0), 1),
'count': stats.get('cpu', {}).get('count', 0)
},
'memory': {
'percent': round(stats.get('memory', {}).get('percent', 0), 1),
'used_gb': round(stats.get('memory', {}).get('used', 0) / (1024**3), 1),
'total_gb': round(stats.get('memory', {}).get('total', 0) / (1024**3), 1)
},
'gpu': {
'percent': round(gpu_stats.get('usage', 0), 1),
'temperature': round(gpu_stats.get('temperature', 0), 1),
'available': gpu_stats.get('available', False)
},
'processes': {
'count': stats.get('processes', {}).get('count', 0)
},
'disk': {
'percent': round(stats.get('disk', {}).get('percent', 0), 1)
},
'network': {
'bytes_sent': stats.get('network', {}).get('bytes_sent', 0),
'bytes_recv': stats.get('network', {}).get('bytes_recv', 0)
},
'last_update': self._data['last_update']
}
def get_system_info(self) -> Dict[str, Any]:
"""Get static system information"""
with self._lock:
return self._data['system_info'].copy()
def get_system_stats(self) -> Dict[str, Any]:
"""Get current system statistics"""
with self._lock:
return self._data['system_stats'].copy()
def get_gpu_info(self) -> Dict[str, Any]:
"""Get static GPU information"""
with self._lock:
return self._data['gpu_info'].copy()
def get_gpu_stats(self) -> Dict[str, Any]:
"""Get current GPU statistics"""
with self._lock:
return self._data['gpu_stats'].copy()
def get_processes(self, limit: int = 10) -> list:
"""Get top processes (fetched on demand to avoid overhead)"""
return self.system_monitor.get_processes(limit)
def format_bytes(self, bytes_value: int) -> str:
"""Format bytes to human readable format"""
return self.system_monitor.format_bytes(bytes_value)
# Global instance
data_manager = DataManager()

View File

@@ -2,11 +2,144 @@ import subprocess
import re
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
from enum import Enum
from nicegui import binding
logger = logging.getLogger(__name__)
class GPUVendor(Enum):
AMD = "amd"
NVIDIA = "nvidia"
UNKNOWN = "unknown"
@binding.bindable_dataclass
class GPUMonitor:
"""Unified GPU monitor that auto-detects and monitors AMD or NVIDIA GPUs"""
# GPU identification
vendor: GPUVendor = GPUVendor.UNKNOWN
available: bool = False
gpu_name: str = "Unknown GPU"
driver_version: str = "Unknown"
# GPU stats
temperature: float = 0.0
usage: float = 0.0
memory_used: int = 0
memory_total: int = 0
memory_percent: float = 0.0
power_draw: float = 0.0
power_limit: float = 0.0
# Multi-GPU support
gpu_count: int = 0
gpu_list: List[Dict[str, Any]] = field(default_factory=list)
# Internal state
_monitor: Optional[Any] = None
last_update: Optional[float] = None
def __post_init__(self):
"""Initialize by detecting available GPU"""
self._detect_gpu()
if self.available:
self.update()
def _detect_gpu(self):
"""Detect which GPU vendor is available"""
# Try NVIDIA first
if self._check_nvidia():
self.vendor = GPUVendor.NVIDIA
self._monitor = GPUNVIDIAMonitor()
self.available = True
logger.info("Detected NVIDIA GPU")
# Then try AMD
elif self._check_amd():
self.vendor = GPUVendor.AMD
self._monitor = GPUAMDMonitor()
self.available = True
logger.info("Detected AMD GPU")
else:
self.available = False
logger.info("No GPU detected")
def _check_nvidia(self) -> bool:
"""Check if NVIDIA GPU is available"""
try:
result = subprocess.run(['nvidia-smi', '--help'],
capture_output=True, text=True, timeout=2)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def _check_amd(self) -> bool:
"""Check if AMD GPU is available"""
try:
# Check for rocm-smi
result = subprocess.run(['rocm-smi', '--help'],
capture_output=True, text=True, timeout=2)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Check for GPU in /sys/class/drm
try:
import os
gpu_dirs = [d for d in os.listdir('/sys/class/drm') if d.startswith('card')]
for gpu_dir in gpu_dirs:
vendor_path = f'/sys/class/drm/{gpu_dir}/device/vendor'
if os.path.exists(vendor_path):
with open(vendor_path, 'r') as f:
vendor_id = f.read().strip()
if vendor_id == '0x1002': # AMD vendor ID
return True
except:
pass
return False
def update(self):
"""Update GPU statistics"""
if not self.available or not self._monitor:
return
try:
# Get GPU info if not already retrieved
if not self.gpu_name or self.gpu_name == "Unknown GPU":
info = self._monitor.get_gpu_info()
if info.get('available'):
if info.get('cards'):
self.gpu_name = info['cards'][0].get('name', 'Unknown GPU')
self.gpu_count = len(info['cards'])
self.gpu_list = info['cards']
if self.vendor == GPUVendor.NVIDIA:
self.driver_version = info.get('driver_version', 'Unknown')
# Get GPU stats
stats = self._monitor.get_primary_gpu_stats()
if stats.get('available'):
self.temperature = stats.get('temperature', 0.0)
self.usage = stats.get('usage', 0.0)
self.memory_used = int(stats.get('memory_used', 0))
self.memory_total = int(stats.get('memory_total', 0))
self.memory_percent = stats.get('memory_percent', 0.0)
# Power stats (mainly for NVIDIA)
if self.vendor == GPUVendor.NVIDIA:
self.power_draw = stats.get('power_draw', 0.0)
self.power_limit = stats.get('power_limit', 0.0)
self.last_update = time.time()
except Exception as e:
logger.error(f"Error updating GPU stats: {e}")
class GPUAMDMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
@@ -18,7 +151,7 @@ class GPUMonitor:
try:
# Check for rocm-smi (AMD)
result = subprocess.run(['rocm-smi', '--help'],
capture_output=True, text=True, timeout=5)
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
@@ -27,7 +160,7 @@ class GPUMonitor:
try:
# Check for radeontop
result = subprocess.run(['radeontop', '--help'],
capture_output=True, text=True, timeout=5)
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
@@ -68,7 +201,7 @@ class GPUMonitor:
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
now - self.last_update > self.cache_duration):
try:
stats = self._get_rocm_stats()
@@ -89,7 +222,7 @@ class GPUMonitor:
"""Get GPU info using rocm-smi"""
try:
result = subprocess.run(['rocm-smi', '--showid', '--showproductname'],
capture_output=True, text=True, timeout=10)
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
@@ -118,7 +251,7 @@ class GPUMonitor:
try:
# Get temperature, usage, and memory info
result = subprocess.run(['rocm-smi', '--showtemp', '--showuse', '--showmeminfo'],
capture_output=True, text=True, timeout=10)
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
stats = {'available': True, 'cards': []}
@@ -295,7 +428,7 @@ class GPUMonitor:
memory_percent = 0
if (primary_gpu.get('memory_used') is not None and
primary_gpu.get('memory_total') is not None and
primary_gpu['memory_total'] > 0):
primary_gpu['memory_total'] > 0):
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
return {
@@ -305,4 +438,222 @@ class GPUMonitor:
'memory_percent': memory_percent,
'memory_used': primary_gpu.get('memory_used', 0) or 0,
'memory_total': primary_gpu.get('memory_total', 0) or 0
}
}
class GPUNVIDIAMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
self._cached_data = {}
self.gpu_available = self._check_gpu_availability()
def _check_gpu_availability(self) -> bool:
"""Check if NVIDIA GPU monitoring tools are available"""
try:
# Check for nvidia-smi
result = subprocess.run(['nvidia-smi', '--help'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return False
def get_gpu_info(self) -> Dict[str, Any]:
"""Get static GPU information"""
if not self.gpu_available:
return {'available': False, 'message': 'No NVIDIA GPU monitoring tools found'}
if not self._cached_data.get('gpu_info'):
try:
gpu_info = self._get_nvidia_info()
self._cached_data['gpu_info'] = gpu_info
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
self._cached_data['gpu_info'] = {'available': False, 'error': str(e)}
return self._cached_data['gpu_info']
def get_gpu_stats(self) -> Dict[str, Any]:
"""Get real-time GPU statistics"""
if not self.gpu_available:
return {'available': False}
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
try:
stats = self._get_nvidia_stats()
stats['timestamp'] = now
self._cached_data['stats'] = stats
self.last_update = now
except Exception as e:
logger.error(f"Error getting GPU stats: {e}")
self._cached_data['stats'] = {'available': False, 'error': str(e)}
return self._cached_data.get('stats', {'available': False})
def _get_nvidia_info(self) -> Dict[str, Any]:
"""Get GPU info using nvidia-smi"""
try:
# Get GPU name, driver version, and CUDA version
result = subprocess.run(['nvidia-smi', '--query-gpu=index,name,driver_version',
'--format=csv,noheader'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
gpu_info = {'available': True, 'driver': 'nvidia-smi', 'cards': []}
# Get driver and CUDA version from general output
version_result = subprocess.run(['nvidia-smi'],
capture_output=True, text=True, timeout=10)
if version_result.returncode == 0:
# Parse driver version
driver_match = re.search(r'Driver Version:\s*(\S+)', version_result.stdout)
if driver_match:
gpu_info['driver_version'] = driver_match.group(1)
# Parse CUDA version
cuda_match = re.search(r'CUDA Version:\s*(\S+)', version_result.stdout)
if cuda_match:
gpu_info['cuda_version'] = cuda_match.group(1)
# Parse GPU info
lines = result.stdout.strip().split('\n')
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 3:
gpu_info['cards'].append({
'id': int(parts[0]),
'name': parts[1],
'driver_version': parts[2]
})
return gpu_info if gpu_info['cards'] else {'available': False}
return {'available': False}
except Exception as e:
logger.debug(f"nvidia-smi not available: {e}")
return {'available': False, 'error': str(e)}
def _get_nvidia_stats(self) -> Dict[str, Any]:
"""Get GPU stats using nvidia-smi"""
try:
# Query multiple metrics at once
result = subprocess.run([
'nvidia-smi',
'--query-gpu=index,temperature.gpu,utilization.gpu,memory.used,memory.total,power.draw,power.limit',
'--format=csv,noheader,nounits'
], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
stats = {'available': True, 'cards': []}
lines = result.stdout.strip().split('\n')
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 5:
gpu_stats = {
'id': int(parts[0]),
'temperature': None,
'usage': None,
'memory_used': None,
'memory_total': None,
'power_draw': None,
'power_limit': None
}
# Parse temperature
if parts[1] and parts[1] != '[N/A]':
try:
gpu_stats['temperature'] = float(parts[1])
except ValueError:
pass
# Parse GPU utilization
if parts[2] and parts[2] != '[N/A]':
try:
gpu_stats['usage'] = int(parts[2])
except ValueError:
pass
# Parse memory usage
if parts[3] and parts[3] != '[N/A]':
try:
gpu_stats['memory_used'] = int(parts[3])
except ValueError:
pass
if parts[4] and parts[4] != '[N/A]':
try:
gpu_stats['memory_total'] = int(parts[4])
except ValueError:
pass
# Parse power stats if available
if len(parts) >= 7:
if parts[5] and parts[5] != '[N/A]':
try:
gpu_stats['power_draw'] = float(parts[5])
except ValueError:
pass
if parts[6] and parts[6] != '[N/A]':
try:
gpu_stats['power_limit'] = float(parts[6])
except ValueError:
pass
stats['cards'].append(gpu_stats)
return stats if stats['cards'] else {'available': False}
return {'available': False}
except Exception as e:
logger.debug(f"nvidia-smi stats not available: {e}")
return {'available': False, 'error': str(e)}
def get_primary_gpu_stats(self) -> Dict[str, Any]:
"""Get stats for the primary/first GPU"""
all_stats = self.get_gpu_stats()
if not all_stats.get('available') or not all_stats.get('cards'):
return {
'available': False,
'usage': 0,
'temperature': 0,
'memory_percent': 0,
'power_draw': 0,
'power_limit': 0
}
primary_gpu = all_stats['cards'][0]
# Calculate memory percentage
memory_percent = 0
if (primary_gpu.get('memory_used') is not None and
primary_gpu.get('memory_total') is not None and
primary_gpu['memory_total'] > 0):
memory_percent = (primary_gpu['memory_used'] / primary_gpu['memory_total']) * 100
return {
'available': True,
'usage': primary_gpu.get('usage', 0) or 0,
'temperature': primary_gpu.get('temperature', 0) or 0,
'memory_percent': memory_percent,
'memory_used': primary_gpu.get('memory_used', 0) or 0,
'memory_total': primary_gpu.get('memory_total', 0) or 0,
'power_draw': primary_gpu.get('power_draw', 0) or 0,
'power_limit': primary_gpu.get('power_limit', 0) or 0
}
if __name__ == "__main__":
monitor = GPUMonitor()
from pprint import pprint

66
src/utils/ollama.py Normal file
View File

@@ -0,0 +1,66 @@
import httpx
from nicegui import ui
async def available_models(url='http://127.0.0.1:11434'):
async with httpx.AsyncClient() as client:
response = await client.get(f"{url}/api/tags")
response.raise_for_status()
return response.json()["models"]
async def loaded_models(url='http://127.0.0.1:11434'):
async with httpx.AsyncClient() as client:
response = await client.get(f"{url}/api/ps")
response.raise_for_status()
return response.json()
async def create_ollama_model(name, modelfile_content, url='http://127.0.0.1:11434'):
data = {
"name": name,
"from": "qwen2.5-coder:7b",
"modelfile": modelfile_content,
"stream": False
}
async with httpx.AsyncClient() as client:
response = await client.post(f"{url}/api/create", json=data)
response.raise_for_status()
print(response.text)
return response.json()
async def delete_model(name, url='http://127.0.0.1:11434') -> bool:
data = {"name": name}
async with httpx.AsyncClient() as client:
try:
response = await client.request("DELETE", f"{url}/api/delete", json=data)
if response.status_code == 200:
return True
else:
ui.notify(f'Failed to delete model: {response.text}', type='negative')
return False
except Exception as e:
ui.notify(f'Error deleting model: {str(e)}', type='negative')
return False
async def model_info(name, url='http://127.0.0.1:11434'):
data = {
"name": name
}
async with httpx.AsyncClient() as client:
response = await client.post(f"{url}/api/show", json=data)
response.raise_for_status()
return response.json()
async def stream_chat(data, url='http://127.0.0.1:11434'):
async with httpx.AsyncClient() as client:
async with client.stream('POST', f"{url}/api/chat", json=data) as response:
async for chunk in response.aiter_text():
yield chunk

View File

@@ -2,123 +2,146 @@ import psutil
import platform
import time
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Any
from typing import Dict, Any, List, Optional
from nicegui import binding
logger = logging.getLogger(__name__)
@binding.bindable_dataclass
class SystemMonitor:
def __init__(self):
self.last_update = None
self.cache_duration = 2 # seconds
self._cached_data = {}
# CPU stats
cpu_percent: float = 0.0
cpu_count: int = 0
cpu_frequency: Optional[float] = None
cpu_model: str = "Unknown"
load_avg: Optional[tuple] = None
def get_system_info(self) -> Dict[str, Any]:
"""Get static system information"""
if not self._cached_data.get('system_info'):
# Memory stats
memory_total: int = 0
memory_used: int = 0
memory_available: int = 0
memory_percent: float = 0.0
memory_free: int = 0
# Swap stats
swap_total: int = 0
swap_used: int = 0
swap_percent: float = 0.0
# Disk stats
disk_total: int = 0
disk_used: int = 0
disk_free: int = 0
disk_percent: float = 0.0
# Network stats
network_bytes_sent: int = 0
network_bytes_recv: int = 0
network_packets_sent: int = 0
network_packets_recv: int = 0
# System info
os_name: str = "Unknown"
kernel: str = "Unknown"
hostname: str = "Unknown"
architecture: str = "Unknown"
uptime: str = "0m"
# Process stats
process_count: int = 0
top_processes: List[Dict] = field(default_factory=list)
# Temperature
temperatures: Dict[str, float] = field(default_factory=dict)
# Update tracking
last_update: Optional[float] = None
def __post_init__(self):
"""Initialize static system information on creation"""
self._update_static_info()
self.update()
def _update_static_info(self):
"""Update static system information (called once on init)"""
try:
uname = platform.uname()
self.os_name = f"{uname.system}"
self.kernel = uname.release
self.hostname = uname.node
self.architecture = uname.machine
self.cpu_model = self._get_cpu_info()
self.cpu_count = psutil.cpu_count()
self.memory_total = psutil.virtual_memory().total
except Exception as e:
logger.error(f"Error getting static system info: {e}")
def update(self):
"""Update all dynamic system statistics"""
try:
# CPU stats
self.cpu_percent = psutil.cpu_percent(interval=0.1)
cpu_freq = psutil.cpu_freq()
self.cpu_frequency = cpu_freq.current if cpu_freq else None
# Load average (Unix only)
try:
uname = platform.uname()
boot_time = datetime.fromtimestamp(psutil.boot_time())
uptime = datetime.now() - boot_time
self.load_avg = psutil.getloadavg()
except AttributeError:
self.load_avg = None
self._cached_data['system_info'] = {
'os': f"{uname.system}",
'kernel': uname.release,
'cpu': self._get_cpu_info(),
'memory_total': psutil.virtual_memory().total,
'uptime': self._format_uptime(uptime),
'hostname': uname.node,
'architecture': uname.machine
}
except Exception as e:
logger.error(f"Error getting system info: {e}")
self._cached_data['system_info'] = {}
# Memory stats
memory = psutil.virtual_memory()
self.memory_total = memory.total
self.memory_available = memory.available
self.memory_used = memory.used
self.memory_percent = memory.percent
self.memory_free = memory.free
return self._cached_data['system_info']
# Swap stats
swap = psutil.swap_memory()
self.swap_total = swap.total
self.swap_used = swap.used
self.swap_percent = swap.percent
def get_system_stats(self) -> Dict[str, Any]:
"""Get real-time system statistics"""
now = time.time()
if (self.last_update is None or
now - self.last_update > self.cache_duration):
# Disk stats
disk = psutil.disk_usage('/')
self.disk_total = disk.total
self.disk_used = disk.used
self.disk_free = disk.free
self.disk_percent = (disk.used / disk.total) * 100 if disk.total > 0 else 0
try:
# CPU stats
cpu_percent = psutil.cpu_percent(interval=0.1)
cpu_count = psutil.cpu_count()
cpu_freq = psutil.cpu_freq()
# Network stats
network = psutil.net_io_counters()
if network:
self.network_bytes_sent = network.bytes_sent
self.network_bytes_recv = network.bytes_recv
self.network_packets_sent = network.packets_sent
self.network_packets_recv = network.packets_recv
# Memory stats
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
# Process count
self.process_count = len(psutil.pids())
# Disk stats
disk = psutil.disk_usage('/')
# Update top processes
self.top_processes = self.get_top_processes(10)
# Network stats
network = psutil.net_io_counters()
# Temperature (if available)
self.temperatures = self._get_temperatures()
# Process count
process_count = len(psutil.pids())
# Uptime
boot_time = datetime.fromtimestamp(psutil.boot_time())
uptime = datetime.now() - boot_time
self.uptime = self._format_uptime(uptime)
# Load average (Unix only)
load_avg = None
try:
load_avg = psutil.getloadavg()
except AttributeError:
# Windows doesn't have load average
pass
self.last_update = time.time()
# Temperature (if available)
temperatures = self._get_temperatures()
except Exception as e:
logger.error(f"Error updating system stats: {e}")
self._cached_data['stats'] = {
'cpu': {
'percent': cpu_percent,
'count': cpu_count,
'frequency': cpu_freq.current if cpu_freq else None,
'load_avg': load_avg
},
'memory': {
'total': memory.total,
'available': memory.available,
'used': memory.used,
'percent': memory.percent,
'free': memory.free
},
'swap': {
'total': swap.total,
'used': swap.used,
'percent': swap.percent
},
'disk': {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': (disk.used / disk.total) * 100
},
'network': {
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
},
'processes': {
'count': process_count
},
'temperatures': temperatures,
'timestamp': now
}
self.last_update = now
except Exception as e:
logger.error(f"Error getting system stats: {e}")
self._cached_data['stats'] = {}
return self._cached_data.get('stats', {})
def get_processes(self, limit: int = 10) -> list:
def get_top_processes(self, limit: int = 10) -> List[Dict]:
"""Get top processes by CPU usage"""
try:
processes = []
@@ -184,4 +207,10 @@ class SystemMonitor:
if bytes_value < 1024.0:
return f"{bytes_value:.1f}{unit}"
bytes_value /= 1024.0
return f"{bytes_value:.1f}PB"
return f"{bytes_value:.1f}PB"
if __name__ == '__main__':
from pprint import pprint
monitor = SystemMonitor()
pprint(monitor)