Skip to content

Prometheus:client python

Prometheus instrumentation library for Python applications

Example

requirements.txt file:

fastapi==0.104.1
uvicorn[standard]==0.24.0
prometheus-client==0.19.0
psutil==5.9.6

main.py 파일:

"""
System and GPU Monitoring Server with Prometheus Metrics
시스템 및 GPU 모니터링 서버 (Prometheus 메트릭 지원)
"""

import psutil
import subprocess
import time
import platform
from typing import Optional, List, Dict, Any
from fastapi import FastAPI
from fastapi.responses import PlainTextResponse
from prometheus_client import Gauge, Counter, generate_latest, REGISTRY
import uvicorn

# FastAPI 앱 초기화 / Initialize FastAPI app
app = FastAPI(title="System & GPU Monitoring Server")

# Prometheus 메트릭 정의 / Define Prometheus metrics
# CPU 메트릭 / CPU metrics
cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
cpu_count = Gauge('system_cpu_count', 'Number of CPU cores')
cpu_freq = Gauge('system_cpu_frequency_mhz', 'Current CPU frequency in MHz')

# 메모리 메트릭 / Memory metrics
memory_total = Gauge('system_memory_total_bytes', 'Total memory in bytes')
memory_used = Gauge('system_memory_used_bytes', 'Used memory in bytes')
memory_available = Gauge('system_memory_available_bytes', 'Available memory in bytes')
memory_percent = Gauge('system_memory_usage_percent', 'Memory usage percentage')

# 디스크 메트릭 / Disk metrics
disk_total = Gauge('system_disk_total_bytes', 'Total disk space in bytes', ['device', 'mountpoint'])
disk_used = Gauge('system_disk_used_bytes', 'Used disk space in bytes', ['device', 'mountpoint'])
disk_free = Gauge('system_disk_free_bytes', 'Free disk space in bytes', ['device', 'mountpoint'])
disk_percent = Gauge('system_disk_usage_percent', 'Disk usage percentage', ['device', 'mountpoint'])

# 네트워크 메트릭 / Network metrics
network_bytes_sent = Counter('system_network_bytes_sent_total', 'Total bytes sent', ['interface'])
network_bytes_recv = Counter('system_network_bytes_received_total', 'Total bytes received', ['interface'])

# GPU 메트릭 (NVIDIA) / GPU metrics (NVIDIA)
gpu_utilization = Gauge('gpu_utilization_percent', 'GPU utilization percentage', ['gpu_id', 'gpu_name'])
gpu_memory_used = Gauge('gpu_memory_used_mb', 'GPU memory used in MB', ['gpu_id', 'gpu_name'])
gpu_memory_total = Gauge('gpu_memory_total_mb', 'GPU memory total in MB', ['gpu_id', 'gpu_name'])
gpu_memory_percent = Gauge('gpu_memory_percent', 'GPU memory usage percentage', ['gpu_id', 'gpu_name'])
gpu_temperature = Gauge('gpu_temperature_celsius', 'GPU temperature in Celsius', ['gpu_id', 'gpu_name'])
gpu_power_draw = Gauge('gpu_power_draw_watts', 'GPU power draw in Watts', ['gpu_id', 'gpu_name'])
gpu_power_limit = Gauge('gpu_power_limit_watts', 'GPU power limit in Watts', ['gpu_id', 'gpu_name'])
gpu_fan_speed = Gauge('gpu_fan_speed_percent', 'GPU fan speed percentage', ['gpu_id', 'gpu_name'])
gpu_process_memory = Gauge('gpu_process_memory_mb', 'GPU memory used by process', ['gpu_id', 'gpu_name', 'pid', 'process_name'])

# CPU 코어별 메트릭 / Per-core CPU metrics
cpu_core_usage = Gauge('system_cpu_core_usage_percent', 'CPU usage per core', ['core'])

# CPU 프로세스 메트릭 / CPU process metrics
process_cpu_usage = Gauge('system_process_cpu_percent', 'Process CPU usage percentage', ['pid', 'name'])
process_memory_usage = Gauge('system_process_memory_percent', 'Process memory usage percentage', ['pid', 'name'])

# 네트워크 인터페이스 정보 / Network interface information
network_interface_info = Gauge('system_network_interface_info', 'Network interface information', ['interface', 'ip_address', 'netmask'])

# 스왑 메모리 메트릭 / Swap memory metrics
swap_total = Gauge('system_swap_total_bytes', 'Total swap memory in bytes')
swap_used = Gauge('system_swap_used_bytes', 'Used swap memory in bytes')
swap_percent = Gauge('system_swap_usage_percent', 'Swap memory usage percentage')

# 메모리 버퍼/캐시 메트릭 / Memory buffers/cache metrics
memory_buffers = Gauge('system_memory_buffers_bytes', 'Memory used for buffers')
memory_cached = Gauge('system_memory_cached_bytes', 'Memory used for cache')

# Load average 메트릭 / Load average metrics
load_average_1 = Gauge('system_load_average_1min', 'System load average for 1 minute')
load_average_5 = Gauge('system_load_average_5min', 'System load average for 5 minutes')
load_average_15 = Gauge('system_load_average_15min', 'System load average for 15 minutes')


def collect_system_metrics():
    """
    Collect system metrics using psutil
    psutil을 사용하여 시스템 메트릭 수집
    """
    try:
        # CPU 정보 수집 / Collect CPU information
        cpu_usage.set(psutil.cpu_percent(interval=1))
        cpu_count.set(psutil.cpu_count())
        cpu_freq_info = psutil.cpu_freq()
        if cpu_freq_info:
            cpu_freq.set(cpu_freq_info.current)

        # 코어별 CPU 사용률 / Per-core CPU usage
        per_cpu = psutil.cpu_percent(interval=0.1, percpu=True)
        for i, usage in enumerate(per_cpu):
            cpu_core_usage.labels(core=str(i)).set(usage)

        # Top CPU 프로세스 수집 (상위 10개) / Collect top CPU processes (top 10)
        top_processes = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            try:
                pinfo = proc.info
                if pinfo['cpu_percent'] is not None and pinfo['cpu_percent'] > 0:
                    top_processes.append({
                        'pid': pinfo['pid'],
                        'name': pinfo['name'],
                        'cpu_percent': pinfo['cpu_percent'],
                        'memory_percent': pinfo['memory_percent'] or 0
                    })
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass

        # CPU 사용률 순으로 정렬하고 상위 10개만 메트릭에 추가 / Sort by CPU and add top 10 to metrics
        top_processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
        for proc in top_processes[:10]:
            process_cpu_usage.labels(
                pid=str(proc['pid']),
                name=proc['name']
            ).set(proc['cpu_percent'])
            process_memory_usage.labels(
                pid=str(proc['pid']),
                name=proc['name']
            ).set(proc['memory_percent'])

        # Load average (Linux/Unix만 지원) / Load average (Linux/Unix only)
        if hasattr(psutil, 'getloadavg'):
            try:
                load1, load5, load15 = psutil.getloadavg()
                load_average_1.set(load1)
                load_average_5.set(load5)
                load_average_15.set(load15)
            except Exception:
                pass

        # 메모리 정보 수집 / Collect memory information
        memory = psutil.virtual_memory()
        memory_total.set(memory.total)
        memory_used.set(memory.used)
        memory_available.set(memory.available)
        memory_percent.set(memory.percent)

        # 메모리 버퍼/캐시 (Linux만 지원) / Memory buffers/cache (Linux only)
        if hasattr(memory, 'buffers'):
            memory_buffers.set(memory.buffers)
        if hasattr(memory, 'cached'):
            memory_cached.set(memory.cached)

        # 스왑 메모리 정보 / Swap memory information
        swap = psutil.swap_memory()
        swap_total.set(swap.total)
        swap_used.set(swap.used)
        swap_percent.set(swap.percent)

        # 디스크 정보 수집 / Collect disk information
        for partition in psutil.disk_partitions():
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                disk_total.labels(device=partition.device, mountpoint=partition.mountpoint).set(usage.total)
                disk_used.labels(device=partition.device, mountpoint=partition.mountpoint).set(usage.used)
                disk_free.labels(device=partition.device, mountpoint=partition.mountpoint).set(usage.free)
                disk_percent.labels(device=partition.device, mountpoint=partition.mountpoint).set(usage.percent)
            except PermissionError:
                # 권한 없는 파티션 스킵 / Skip partitions without permission
                continue

        # 네트워크 정보 수집 / Collect network information
        network_stats = psutil.net_io_counters(pernic=True)
        for interface, stats in network_stats.items():
            # Counter는 증가량만 추적하므로 _created 사용 / Use _created for Counter initialization
            if network_bytes_sent.labels(interface=interface)._value._value == 0:
                network_bytes_sent.labels(interface=interface).inc(stats.bytes_sent)
            if network_bytes_recv.labels(interface=interface)._value._value == 0:
                network_bytes_recv.labels(interface=interface).inc(stats.bytes_recv)

        # 네트워크 인터페이스 IP 정보 / Network interface IP information
        net_if_addrs = psutil.net_if_addrs()
        for iface, addrs in net_if_addrs.items():
            for addr in addrs:
                if addr.family == 2:  # AF_INET (IPv4)
                    network_interface_info.labels(
                        interface=iface,
                        ip_address=addr.address,
                        netmask=addr.netmask or "N/A"
                    ).set(1)  # 존재 여부만 표시 / Just indicate existence
                    break  # IPv4 주소만 하나 / Only one IPv4 address

    except Exception as e:
        print(f"Error collecting system metrics: {e}")


def collect_gpu_metrics():
    """
    Collect GPU metrics using nvidia-smi
    nvidia-smi를 사용하여 GPU 메트릭 수집
    """
    try:
        # nvidia-smi 명령 실행 / Execute nvidia-smi command
        result = subprocess.run(
            [
                'nvidia-smi',
                '--query-gpu=index,name,utilization.gpu,memory.used,memory.total,temperature.gpu,power.draw,power.limit,fan.speed',
                '--format=csv,noheader,nounits'
            ],
            capture_output=True,
            text=True,
            timeout=5
        )

        if result.returncode == 0:
            # 출력 파싱 / Parse output
            lines = result.stdout.strip().split('\n')
            for line in lines:
                if line:
                    parts = [p.strip() for p in line.split(',')]
                    if len(parts) >= 9:
                        gpu_id = parts[0]
                        gpu_name = parts[1]
                        util = float(parts[2]) if parts[2] not in ['[N/A]', '[Not Supported]'] else -1.0
                        mem_used = float(parts[3]) if parts[3] not in ['[N/A]', '[Not Supported]'] else 0.0
                        mem_total = float(parts[4]) if parts[4] not in ['[N/A]', '[Not Supported]'] else 0.0
                        temp = float(parts[5]) if parts[5] not in ['[N/A]', '[Not Supported]'] else 0.0
                        power = float(parts[6]) if parts[6] not in ['[N/A]', '[Not Supported]'] else -1.0
                        power_limit = float(parts[7]) if parts[7] not in ['[N/A]', '[Not Supported]'] else -1.0
                        fan = float(parts[8]) if parts[8] not in ['[N/A]', '[Not Supported]'] else 0.0

                        # GPU 메트릭 업데이트 / Update GPU metrics
                        gpu_utilization.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(util)
                        gpu_memory_used.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(mem_used)
                        gpu_memory_total.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(mem_total)
                        if mem_total > 0:
                            gpu_memory_percent.labels(gpu_id=gpu_id, gpu_name=gpu_name).set((mem_used / mem_total) * 100)
                        gpu_temperature.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(temp)
                        gpu_power_draw.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(power)
                        gpu_power_limit.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(power_limit)
                        gpu_fan_speed.labels(gpu_id=gpu_id, gpu_name=gpu_name).set(fan)

                        # GPU 프로세스 정보 수집 / Collect GPU process information
                        processes = get_gpu_processes(gpu_id)
                        for proc in processes:
                            gpu_process_memory.labels(
                                gpu_id=gpu_id,
                                gpu_name=gpu_name,
                                pid=str(proc['pid']),
                                process_name=proc['name']
                            ).set(proc['memory_mb'])
        else:
            print(f"nvidia-smi error: {result.stderr}")

    except FileNotFoundError:
        # nvidia-smi가 없는 경우 (GPU 없음) / nvidia-smi not found (no GPU)
        pass
    except Exception as e:
        print(f"Error collecting GPU metrics: {e}")


def get_gpu_processes(gpu_index: str) -> List[Dict[str, Any]]:
    """
    Get processes running on specific GPU
    특정 GPU에서 실행 중인 프로세스 조회
    """
    processes = []
    try:
        result = subprocess.run(
            [
                'nvidia-smi',
                '--query-compute-apps=pid,process_name,used_memory',
                '--format=csv,noheader,nounits',
                '-i', gpu_index
            ],
            capture_output=True,
            text=True,
            timeout=5
        )

        if result.returncode == 0 and result.stdout.strip():
            lines = result.stdout.strip().split('\n')
            for line in lines:
                if line:
                    parts = [p.strip() for p in line.split(',')]
                    if len(parts) >= 3:
                        processes.append({
                            "pid": int(parts[0]),
                            "name": parts[1],
                            "memory_mb": float(parts[2])
                        })
    except Exception as e:
        print(f"Error getting GPU processes: {e}")

    return processes


@app.get("/")
async def root():
    """
    Root endpoint / 루트 엔드포인트
    """
    return {
        "message": "System & GPU Monitoring Server",
        "endpoints": {
            "/metrics": "Prometheus metrics endpoint",
            "/health": "Health check endpoint",
            "/info": "System information"
        }
    }


@app.get("/metrics", response_class=PlainTextResponse)
async def metrics():
    """
    Prometheus metrics endpoint
    Prometheus 메트릭 엔드포인트
    """
    # 메트릭 수집 / Collect metrics
    collect_system_metrics()
    collect_gpu_metrics()

    # Prometheus 형식으로 메트릭 반환 / Return metrics in Prometheus format
    return generate_latest(REGISTRY)


@app.get("/health")
async def health():
    """
    Health check endpoint
    헬스 체크 엔드포인트
    """
    return {"status": "healthy", "timestamp": time.time()}


def format_bytes(bytes_val: float) -> str:
    """
    Convert bytes to human-readable format
    바이트를 사람이 읽기 쉬운 형식으로 변환
    """
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if bytes_val < 1024.0:
            return f"{bytes_val:.1f}{unit}"
        bytes_val /= 1024.0
    return f"{bytes_val:.1f}PB"


if __name__ == "__main__":
    # 서버 실행 / Run server
    print("Starting System & GPU Monitoring Server...")
    print("Prometheus metrics available at: http://localhost:8000/metrics")
    print("System info available at: http://localhost:8000/info")

    uvicorn.run(app, host="0.0.0.0", port=8000)

실행하자:

python main.py

이제 http://localhost:8000/metrics 으로 접속하면 된다.

See also

Favorite site