Skip to content

Commit

Permalink
Include basic services and processes usage metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dormant-user committed Sep 29, 2024
1 parent 90255c1 commit 3006e28
Show file tree
Hide file tree
Showing 9 changed files with 569 additions and 246 deletions.
104 changes: 104 additions & 0 deletions pyninja/gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import json
import logging
import subprocess
from typing import List, Dict

from . import models

LOGGER = logging.getLogger("uvicorn.default")


def _darwin(lib_path):
result = subprocess.run(
[lib_path, "SPDisplaysDataType", "-json"],
capture_output=True,
text=True,
)
if result.stderr:
LOGGER.debug(result.stderr)
return
displays = json.loads(result.stdout).get("SPDisplaysDataType", [])
gpu_info = []
for display in displays:
if "sppci_model" in display.keys():
gpu_info.append(
dict(
model=display.get("sppci_model"),
cores=display.get("sppci_cores", "N/A"),
memory=display.get(
"sppci_vram", display.get("spdisplays_vram", "N/A")
),
vendor=display.get("sppci_vendor", "N/A"),
)
)
return gpu_info


def _linux(lib_path):
result = subprocess.run(
[lib_path],
capture_output=True,
text=True,
)
if result.stderr:
LOGGER.debug(result.stderr)
return
gpus = result.stdout.splitlines()
gpu_info = []
for line in gpus:
if "VGA" in line:
gpu = line.split(":")[-1].strip()
else:
continue
gpu_info.append(
dict(
model=gpu.split(":")[-1].strip(),
)
)
return gpu_info


def _windows(lib_path):
result = subprocess.run(
[
lib_path,
"path",
"win32_videocontroller",
"get",
"Name,AdapterCompatibility",
"/format:csv",
],
stdout=subprocess.PIPE,
text=True,
)
if result.stderr:
LOGGER.debug(result.stderr)
return
gpus_raw = [line for line in result.stdout.splitlines() if line.strip()]
try:
keys = (
gpus_raw[0]
.replace("Node", "node")
.replace("AdapterCompatibility", "vendor")
.replace("Name", "model")
.split(",")
)
values = "".join(gpus_raw[1:]).split(",")
except ValueError as error:
LOGGER.debug(error)
return
if len(values) >= len(keys):
result = []
for i in range(0, len(values), len(keys)):
result.append(dict(zip(keys, values[i : i + len(keys)])))
return result
else:
LOGGER.debug("ValueError: Not enough values for the keys")


def get_names() -> List[Dict[str, str]]:
fn_map = dict(linux=_linux, darwin=_darwin, windows=_windows)
try:
return fn_map[models.OPERATING_SYSTEM](models.env.gpu_lib)
except (subprocess.SubprocessError, FileNotFoundError) as error:
LOGGER.debug(error)
19 changes: 10 additions & 9 deletions pyninja/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,16 @@ class DiskLib(BaseModel):
windows: FilePath = "C:\\Program Files\\PowerShell\\7\\pwsh.exe"


class WSSettings(BaseModel):
"""Default settings for websocket configuration.
class GPULib(BaseModel):
"""Default GPU library dedicated to each supported operating system.
>>> WSSettings
>>> GPULib
"""

cpu_interval: PositiveInt = 3
refresh_interval: PositiveInt | PositiveFloat = 5


ws_settings = WSSettings()
linux: FilePath = "/usr/bin/lspci"
darwin: FilePath = "/usr/sbin/system_profiler"
windows: FilePath = "C:\\Windows\\System32\\wbem\\wmic.exe"


class WSSession(BaseModel):
Expand All @@ -172,7 +170,7 @@ class WSSession(BaseModel):


def get_library(
library: Type[ServiceLib] | Type[ProcessorLib] | Type[DiskLib],
library: Type[ServiceLib] | Type[ProcessorLib] | Type[DiskLib] | Type[GPULib],
) -> FilePath:
"""Get service/processor/disk library filepath for the host operating system.
Expand Down Expand Up @@ -207,6 +205,9 @@ class EnvConfig(BaseSettings):
monitor_password: str | None = None
monitor_session: PositiveInt = 3_600
max_connections: PositiveInt = 3
processes: List[str] = []
services: List[str] = []
gpu_lib: FilePath = get_library(GPULib)
disk_lib: FilePath = get_library(DiskLib)
service_lib: FilePath = get_library(ServiceLib)
processor_lib: FilePath = get_library(ProcessorLib)
Expand Down
26 changes: 0 additions & 26 deletions pyninja/monitor/config.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,12 @@
import os
import string
import time

from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates


def capwords_filter(value: str) -> str:
"""Capitalizes a string.
Args:
value: String value to be capitalized.
See Also:
This function is added as a filter to Jinja2 templates.
Returns:
str:
Returns the capitalized string.
"""
if value.endswith("_raw"):
parts = value.split("_")
return " ".join(parts[:-1])
if value.endswith("_cap"):
parts = value.split("_")
return parts[0].upper() + " " + " ".join(parts[1:-1])
return string.capwords(value).replace("_", " ")


templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
# Add custom filter to Jinja2 environment
templates.env.filters["capwords"] = capwords_filter


async def clear_session(response: HTMLResponse) -> HTMLResponse:
Expand Down
90 changes: 90 additions & 0 deletions pyninja/monitor/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import json
import logging
import os
import shutil
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict

import psutil

from .. import operations

# Use a ThreadPoolExecutor to run blocking functions in separate threads
EXECUTOR = ThreadPoolExecutor(max_workers=os.cpu_count())
LOGGER = logging.getLogger("uvicorn.default")


def get_cpu_percent() -> List[float]:
"""Get CPU usage percentage.
Returns:
List[float]:
Returns a list of CPU percentages.
"""
return psutil.cpu_percent(interval=1, percpu=True)


async def get_docker_stats() -> List[Dict[str, str]]:
"""Run the docker stats command asynchronously and parse the output.
Returns:
List[Dict[str, str]]:
Returns a list of key-value pairs with the container stat and value.
"""
process = await asyncio.create_subprocess_shell(
'docker stats --no-stream --format "{{json .}}"',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if stderr:
LOGGER.error(stderr.decode())
return []
return [json.loads(line) for line in stdout.decode().strip().splitlines()]


# noinspection PyProtectedMember
async def get_system_metrics() -> Dict[str, dict]:
"""Async handler for virtual memory, swap memory disk usage and CPU load averages.
Returns:
Dict[str, dict]:
Returns a nested dictionary.
"""
m1, m5, m15 = os.getloadavg() or (None, None, None)
return dict(
memory_info=psutil.virtual_memory()._asdict(),
swap_info=psutil.swap_memory()._asdict(),
disk_info=shutil.disk_usage("/")._asdict(),
load_averages=dict(m1=m1, m5=m5, m15=m15),
)


async def system_resources() -> Dict[str, dict]:
"""Gather system resources including Docker stats asynchronously.
Returns:
Dict[str, dict]:
Returns a nested dictionary.
"""
system_metrics_task = asyncio.create_task(get_system_metrics())
docker_stats_task = asyncio.create_task(get_docker_stats())
service_stats_task = asyncio.create_task(operations.service_monitor())
process_stats_task = asyncio.create_task(operations.process_monitor())

# CPU percent check is a blocking call and cannot be awaited, so run it in a separate thread
loop = asyncio.get_event_loop()
cpu_usage_task = loop.run_in_executor(EXECUTOR, get_cpu_percent)

system_metrics = await system_metrics_task
docker_stats = await docker_stats_task
service_stats = await service_stats_task
process_stats = await process_stats_task
cpu_usage = await cpu_usage_task

system_metrics["cpu_usage"] = cpu_usage
system_metrics["docker_stats"] = docker_stats
system_metrics["service_stats"] = service_stats
system_metrics["process_stats"] = process_stats
return system_metrics
Loading

0 comments on commit 3006e28

Please sign in to comment.