package resources import ( "agent/cmd/agent/config" "os" "path/filepath" "runtime" "sort" "time" cgroupStats "linkfog.com/public/lib/cgroup/stats" cgroupTypes "linkfog.com/public/lib/cgroup/types" "linkfog.com/public/lib/l" "linkfog.com/public/lib/unit" "github.com/prometheus/procfs" "github.com/shirou/gopsutil/process" ) type Watch struct { cfg WatchConfig cgroup *cgroupStats.Stats // 采集cgroup cpu和memory curMemUsage float64 curKMemUsage float64 curCPUUsage float64 curCPULoad float64 curIOUsage map[string]*cgroupTypes.IOUsage memSeqWarnCnt int cpuSeqWarnCnt int cpuLoadSeqWarnCnt int ioRdSeqWarnCnt int ioWrSeqWarnCnt int hasReachLimit bool running bool } type WatchConfig struct { MaxMem float64 // 最大内存限值 MaxCPU float64 // 最大cpu限值 MaxCPULoad float64 // 最大cpu负载限值(loadavg 5m) MaxRIO float64 // 最大读io限值 MaxWIO float64 // 最大写io限值 MemWarnPct float64 // 内存告警阈值 MemSeqWarnTimes int // 内存告警连续n次触发降级 CPUWarnPct float64 // cpu告警阈值 CPUSeqWarnTimes int // cpu告警连续n次触发降级 CPULoadSeqWarnTimes int // cpu负载告警连续n次触发降级 IOWarnPct float64 // io告警阈值 IOSeqWarnTimes int // io告警连续n次触发降级 PProfProfilePath string // 采集pprof数据存放目录 PProfCollectDura time.Duration // 采集pprof时长 DetectInterval time.Duration // 周期检测间隔 DetectCPURateRange time.Duration // 计算cpu稳定占比的区间 ExpandToHostCgroupDir string ExpandHostCgroupAvailMemSize uint64 } func NewWatch(cfg WatchConfig) *Watch { l.Info("resources watch create cgroup stats") stats := cgroupStats.NewStats(-1, cfg.DetectInterval, cfg.DetectCPURateRange) err := stats.TryCgroupPath() if err != nil { l.Error("resources watch cgroup stats try cgroup path err:", err) return nil } l.Infof("cgroup stats mem dir:%s", stats.GetMemoryDir()) l.Infof("cgroup stats cpu dir:%s", stats.GetCPUDir()) l.Infof("cgroup stats io dir:%s", stats.GetIODir()) w := &Watch{ cfg: cfg, cgroup: stats, running: false, } l.Infof("resources watch init, cfg:%+v", cfg) return w } func (w *Watch) StartWatch() { w.running = true defer func() { w.running = false }() l.Info("resources watch start") for { if !w.running { break } time.Sleep(w.cfg.DetectInterval) w.watchResourcesUsage() } l.Info("resources watch stopped") } func (w *Watch) Stop() { if !w.running { return } w.running = false l.Info("resources watch quit success") } func (w *Watch) watchResourcesUsage() { getAndPrintHostInfo() getAndPrintProcessInfo() w.getAndPrintContainerInfo() w.printResourcesUsage() } func getAndPrintProcessInfo() { gn := runtime.NumGoroutine() proc, err := process.NewProcess(int32(os.Getpid())) if err != nil { l.Error("get process info error:", err) return } fn, err := proc.NumFDs() if err != nil { l.Error("get process fd num error:", err) return } l.Infof("process stats, goroutine_num:%d, fd_num:%d", gn, fn) } func (w *Watch) getAndPrintContainerInfo() { w.getMemUsage() w.getCPUUsage() w.getCPULoad() w.getIOUsage() mem, err := cgroupStats.GetMemoryUsage(w.cgroup.GetMemoryDir()) if err != nil { l.Error("get memory usage error,", err) return } kmem, err := cgroupStats.GetKernelMemoryUsage(w.cgroup.GetMemoryDir()) if err != nil { l.Error("get kernel memory usage error,", err) return } w.curKMemUsage = kmem activeFile, inactiveFile, activeAnon, inactiveAnon, err := cgroupStats.GetMemoryStatFields(w.cgroup.GetMemoryDir()) if err != nil { l.Errorf("get cgroup stats error,", err) return } l.Infof("container cgroup mem stats, total_active_file:%s, total_inactive_file:%s, "+ "total_active_anon:%s, total_inactive_anon:%s, usage_in_bytes:%s, kmem:%s", unit.ByteSize(activeFile), unit.ByteSize(inactiveFile), unit.ByteSize(activeAnon), unit.ByteSize(inactiveAnon), unit.ByteSize(uint64(mem)), unit.ByteSize(uint64(kmem))) if uint64(w.curKMemUsage) < 200*1024*1024 { return } slabInfo, err := cgroupStats.GetKernelMemorySlabInfo(filepath.Join(w.cgroup.GetMemoryDir())) if err != nil { l.Error("get cgroup slab error,", err) return } if len(slabInfo.Slabs) < 5 { l.Error("get cgroup slab error: slab num too few") return } sort.Sort(slabInfo) l.Debugf("container cgroup memory.kmem.slabinfo top3, %s:%d, %s:%d, %s:%d", slabInfo.Slabs[0].Name, slabInfo.Slabs[0].Cache, slabInfo.Slabs[1].Name, slabInfo.Slabs[1].Cache, slabInfo.Slabs[2].Name, slabInfo.Slabs[2].Cache) } func (w *Watch) getMemUsage() { memUsage, err := w.cgroup.GetMemoryWorkingSet() if err != nil { memUsage = float64(0) l.Error("get memory usage error,", err) } w.curMemUsage = memUsage } func (w *Watch) getCPUUsage() { cpuUsage, err := w.cgroup.GetCPUUsage() if err != nil { cpuUsage = float64(0) l.Error("get cpu usage error,", err) } w.curCPUUsage = cpuUsage } func (w *Watch) getCPULoad() { cpuLoad, err := w.getCPULoadAvg5M(config.MountProc) if err != nil { cpuLoad = float64(0) l.Error("get cpu 5m loadavg error,", err) } w.curCPULoad = cpuLoad } func (w *Watch) getIOUsage() { ioUsage, err := w.cgroup.GetIOUsage() if err != nil { ioUsage = make(map[string]*cgroupTypes.IOUsage, 0) l.Error("get block io usage error,", err) } w.curIOUsage = ioUsage } func (w *Watch) printResourcesUsage() { var ioTotalRbps uint64 var ioTotalWbps uint64 if ioUsage, ok := w.curIOUsage["total"]; ok { ioTotalRbps = uint64(ioUsage.Read) ioTotalWbps = uint64(ioUsage.Write) } l.Debugf("resources watch cfg, %s", w.cfg) l.Infof("resources usage, mem:%s, cpu:%.2f, load:%.2f, rps:%s, wps:%s", unit.ByteSize(uint64(w.curMemUsage)), w.curCPUUsage, w.curCPULoad, unit.ByteSize(ioTotalRbps), unit.ByteSize(ioTotalWbps)) w.printAllDevIOUsage() } func (w *Watch) printAllDevIOUsage() { for dev, ioUsage := range w.curIOUsage { if dev == "total" { continue } if ioUsage.Read == 0 && ioUsage.Write == 0 { continue } l.Debugf("device io usage, dev:%s, rps:%s, wps:%s", dev, unit.ByteSize(uint64(ioUsage.Read)), unit.ByteSize(uint64(ioUsage.Write))) } } func (w *Watch) getCPULoadAvg5M(procDir string) (float64, error) { var fs procfs.FS var err error if procDir != "" { if procDir[len(procDir)-1] == '/' { procDir = procDir[:len(procDir)-1] } fs, err = procfs.NewFS(procDir) } else { fs, err = procfs.NewDefaultFS() procDir = procfs.DefaultMountPoint } if err != nil { return 0, err } loadAvg, err := fs.LoadAvg() if err != nil { return 0, err } return loadAvg.Load5, nil }