CPU使用率:是指在特定时间段内,CPU被占用的时间比例,通常以百分比表示。它反映了CPU的繁忙程度,是衡量系统负载和性能的重要指标。

一、核心概念

  1. 多核CPU的百分比计算方式
    • 每个CPU核心的利用率单独计算为100%
    • 8核CPU的理论最大利用率为 8*100% = 800%
    • 16核CPU则为1600%,以此类推

通常,任务管理器或top命令显示的是单个核心的使用率,比如四核CPU的话,总共有400%的可能。但如果是多线程或者多核处理器,不同的系统可能会有不同的显示方式。例如,在Linux系统中,top命令可能会显示超过100%的使用率,因为每个核心单独计算,所以如果有8个核心,800%就意味着所有核心都在满负荷运行。

  1. 常见场景
    • 计算密集型任务 (如视频渲染、科学计算)
    • 高并发服务 (Web服务器处理大量请求)
    • 程序异常 (死循环、内存泄漏)

二、诊断流程

  1. 定位进程

    # Linux/MacOStop-c# 按P排序CPU使用htop# 图形化查看各核心负载
    pidstat -u13# 详细进程监控# Windows
    任务管理器 → 性能标签 → 打开资源监视器
    
  2. 分析类型

    • 正常高负载 :所有核心满载但响应正常(如渲染输出文件持续增长)
    • 异常负载 :伴随系统卡顿、服务超时、温度报警
  3. 常见异常原因

    • 死循环代码 (如未正确退出的递归)
    • 线程竞争 (锁未释放导致大量线程等待)
    • 配置错误 (线程池设置过大)
    • 恶意挖矿程序

三、解决方案

对于预期内的高负载

  • 垂直扩展:升级CPU(更多核心/更高主频)
  • 水平扩展:分布式计算
  • 优化算法复杂度(如O(n²)→O(n logn))

对于异常高负载

# Linux终止进程sudokill-9$(pidof 异常进程名)# 或按CPU排序杀死前3高进程ps-eo pid,ppid,cmd,%cpu --sort=-%cpu |head-n4|awk'{print $1}'|xargskill-9

长期优化

  • 使用cgroups限制进程资源
  • 配置监控告警(Prometheus+Alertmanager)
  • 代码层添加性能熔断机制

四、特殊场景注意

  1. 容器环境 (Docker/K8s):

    # 查看容器CPU限制docker inspect <容器ID>|grep-i cpushares
    # 实时监控容器CPUdocker stats --format"table {{.Container}}\t{{.CPUPerc}}"

    可能显示超过100%但实际受限于cgroup配额

  2. 虚拟化环境

    • 需区分宿主CPU和客户机CPU分配
    • 可能因CPU超售导致高利用率假象

当遇到持续高CPU使用时,建议优先保存性能分析数据:

# 记录60秒的CPU使用采样
perf record -F99-a-g -- sleep60# 生成火焰图
perf script | FlameGraph/stackcollapse-perf.pl | FlameGraph/flamegraph.pl > cpu.svg

这将生成可视化分析报告,帮助精准定位热点代码。

五、代码监控脚本

当监控GPU显卡时:

import psutil
import pynvml
import time
import csv
import os
defget_process_info(pid=None, process_name=None):"""
    获取指定进程的CPU使用率和显存占用情况。
    :param pid: 进程ID
    :param process_name: 进程名称
    :return: 进程对象,CPU使用率,显存占用(MB)
    """
    process =Noneif pid:try:
            process = psutil.Process(pid)except psutil.NoSuchProcess:print(f"未找到PID为{pid}的进程。")returnNone,None,Noneelif process_name:for proc in psutil.process_iter(['pid','name']):if proc.info['name']== process_name:
                process = psutil.Process(proc.info['pid'])breakifnot process:print(f"未找到名称为{process_name}的进程。")returnNone,None,Noneelse:print("必须提供PID或进程名称。")returnNone,None,None# 获取CPU使用率
    cpu_usage = process.cpu_percent(interval=1)# 获取显存占用情况
    gpu_memory =0try:
        pynvml.nvmlInit()for i inrange(pynvml.nvmlDeviceGetCount()):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)for proc in pynvml.nvmlDeviceGetComputeRunningProcesses(handle):if proc.pid == process.pid:
                    gpu_memory = proc.usedGpuMemory /(1024*1024)# 转换为MBbreak
        pynvml.nvmlShutdown()except pynvml.NVMLError:print("未检测到GPU或NVIDIA驱动未安装。")return process, cpu_usage, gpu_memory
defmonitor_process(pid=None, process_name=None, interval=1, duration=60, output_file="process_monitor.csv"):"""
    实时监控指定进程的CPU和显存占用情况,并将数据保存到CSV文件中。
    :param pid: 进程ID
    :param process_name: 进程名称
    :param interval: 监控间隔时间(秒)
    :param duration: 监控总时长(秒)
    :param output_file: 输出CSV文件名
    """ifnot pid andnot process_name:print("必须提供PID或进程名称。")returnwithopen(output_file, mode='w', newline='')asfile:
        writer = csv.writer(file)
        writer.writerow(["Timestamp","PID","Process Name","CPU Usage (%)","GPU Memory (MB)"])
        start_time = time.time()while time.time()- start_time < duration:
            process, cpu_usage, gpu_memory = get_process_info(pid, process_name)if process:
                timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                writer.writerow([timestamp, process.pid, process.name(), cpu_usage, gpu_memory])print(f"{timestamp} - PID: {process.pid}, CPU: {cpu_usage}%, GPU Memory: {gpu_memory}MB")else:print("进程已结束或未找到。")break
            time.sleep(interval)if __name__ =="__main__":# 示例:监控PID为1234的进程,每2秒记录一次,持续60秒
    monitor_process(pid=1234, interval=2, duration=60, output_file="process_monitor.csv")# 或者通过进程名监控# monitor_process(process_name="example_process", interval=2, duration=60, output_file="process_monitor.csv")

如果是监控华为NPU,这里是使用npu-smi

import psutil
import subprocess
import time
import csv
import os
defget_npu_memory_usage(npu_id):"""
    获取指定NPU卡的显存占用情况。
    :param npu_id: NPU卡的ID
    :return: 显存占用率(MB)
    """try:# 使用npu-smi命令获取显存占用信息
        result = subprocess.run(["npu-smi","info","-t","usages","-i",str(npu_id)],
            capture_output=True, text=True, check=True)
        output = result.stdout
        # 解析输出,提取显存占用率for line in output.splitlines():if"Memory Usage Rate(%)"in line:
                memory_usage_rate =int(line.split(":")[1].strip().replace("%",""))breakreturn memory_usage_rate
    except subprocess.CalledProcessError as e:print(f"获取NPU显存占用失败: {e}")returnNonedefget_process_info(pid=None, process_name=None):"""
    获取指定进程的CPU使用率和NPU显存占用情况。
    :param pid: 进程ID
    :param process_name: 进程名称
    :return: 进程对象,CPU使用率,NPU显存占用(MB)
    """
    process =Noneif pid:try:
            process = psutil.Process(pid)except psutil.NoSuchProcess:print(f"未找到PID为{pid}的进程。")returnNone,None,Noneelif process_name:for proc in psutil.process_iter(['pid','name']):if proc.info['name']== process_name:
                process = psutil.Process(proc.info['pid'])breakifnot process:print(f"未找到名称为{process_name}的进程。")returnNone,None,Noneelse:print("必须提供PID或进程名称。")returnNone,None,None# 获取CPU使用率
    cpu_usage = process.cpu_percent(interval=1)# 获取NPU显存占用情况
    npu_memory =0try:# 假设NPU卡ID为0,可以根据实际情况修改
        npu_memory = get_npu_memory_usage(0)except Exception as e:print(f"获取NPU显存占用失败: {e}")return process, cpu_usage, npu_memory
defmonitor_process(pid=None, process_name=None, interval=1, duration=60, output_file="process_monitor.csv"):"""
    实时监控指定进程的CPU和NPU显存占用情况,并将数据保存到CSV文件中。
    :param pid: 进程ID
    :param process_name: 进程名称
    :param interval: 监控间隔时间(秒)
    :param duration: 监控总时长(秒)
    :param output_file: 输出CSV文件名
    """ifnot pid andnot process_name:print("必须提供PID或进程名称。")returnwithopen(output_file, mode='w', newline='')asfile:
        writer = csv.writer(file)
        writer.writerow(["Timestamp","PID","Process Name","CPU Usage (%)","NPU Memory Usage Rate (%)"])
        start_time = time.time()while time.time()- start_time < duration:
            process, cpu_usage, npu_memory = get_process_info(pid, process_name)if process:
                timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                writer.writerow([timestamp, process.pid, process.name(), cpu_usage, npu_memory])print(f"{timestamp} - PID: {process.pid}, CPU: {cpu_usage}%, NPU Memory: {npu_memory}%")else:print("进程已结束或未找到。")break
            time.sleep(interval)if __name__ =="__main__":# 示例:监控PID为1234的进程,每2秒记录一次,持续60秒
    monitor_process(pid=1234, interval=2, duration=60, output_file="process_monitor.csv")# 或者通过进程名监控# monitor_process(process_name="example_process", interval=2, duration=60, output_file="process_monitor.csv")