Commit 89455b57 authored by Lei Li's avatar Lei Li
Browse files

Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true">
<buildTags>
<option name="os" value="linux" />
</buildTags>
</component>
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="myTest" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/agent.iml" filepath="$PROJECT_DIR$/.idea/agent.iml" />
<module fileurl="file://$PROJECT_DIR$/../../myTest/.idea/myTest.iml" filepath="$PROJECT_DIR$/../../myTest/.idea/myTest.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
# agent
采集数据,上报信息
\ No newline at end of file
package config
import "time"
var (
WorkDir = "/linkfog/agent/"
LogDir = WorkDir + "/log"
LogFileName = "agent.log"
DaemonLogFileName = "agent_daemon.log"
MountPrefix = "/host"
MountProc = MountPrefix + "/proc/"
PluginDir = WorkDir + "/plugin"
BuildCommit = ""
MonitorLogUnModDuration = 600 * time.Second
BinDir = WorkDir + "/bin"
)
package core
import (
"agent/cmd/agent/config"
"agent/cmd/agent/option"
"agent/module"
"agent/module/plugin"
"agent/module/resources"
"fmt"
"linkfog.com/public/lib/monitor_daemon"
"os"
"path/filepath"
"strings"
"time"
"linkfog.com/public/lib/l"
"github.com/judwhite/go-svc"
)
var AgentSvc = &Agent{}
type Agent struct {
IsStop bool
}
func (a *Agent) Init(env svc.Environment) error {
option.Parse()
if option.Opt.EnableMonitorDaemon {
config.LogFileName = config.DaemonLogFileName
}
// 日志设置
err := l.SetConfig(l.Config{
Path: filepath.Join(config.LogDir, config.LogFileName),
MaxSize: int64(option.Opt.LogMaxSizeKB),
MaxRolls: option.Opt.LogRollNum,
LogLevel: l.LevelInfo,
VerboseLevel: option.Opt.Verbose,
StdoutLimit: l.DefaultStdoutLimit,
})
if err != nil {
return fmt.Errorf("set log config err:%v", err)
}
return nil
}
func (a *Agent) Start() error {
if option.Opt.PrintVersion {
fmt.Println(genVersion())
os.Exit(0)
}
if option.Opt.EnableMonitorDaemon {
l.Info("======start monitor_daemon======")
param := fmt.Sprintf("%s=false", option.ArgEnableMonitorDaemon)
cmdline := strings.Replace(
strings.Join(append(os.Args, param), " "), "", "", -1)
opts := []monitor_daemon.MonitorDaemonOpt{
monitor_daemon.WithForwardSignalType(monitor_daemon.ForwardSignalChildGroup),
monitor_daemon.WithMaxInterval(option.Opt.MonitorDefaultMaxInterval), // 设置重新拉起进程的最大时间间隔,防止频繁重启带来的系统压力
monitor_daemon.WithIncrInterval(option.Opt.MonitorDefaultIncrInterval), // 设置重新拉起进程的时间间隔,防止频繁重启带来的系统压力
monitor_daemon.WithMaxRetryTimes(option.Opt.MonitorDefaultMaxRetryTimes),
monitor_daemon.WithAgentBackupPath(filepath.Join(config.BinDir, option.Opt.MonitorDefaultBackupFileName)),
}
if option.Opt.EnableProcAbnormalCb {
opts = append(opts, monitor_daemon.WithIsProcAbnormalCallback(IsProcAbnormal))
}
monitorDaemon, err := monitor_daemon.New(cmdline, opts...)
if err != nil {
l.Error("monitor_daemon new error:", err, ", cmdline:", cmdline)
os.Exit(1)
}
go func() {
l.Info("monitor_daemon run, cmdline:", cmdline)
monitorDaemon.Run()
l.Info("======monitor_daemon stop======")
}()
return nil
}
l.Info("======start agent======")
l.Info("version:", genVersion())
if err := initSvcCfg(); err != nil {
return err
}
if err := startSvcModules(); err != nil {
return err
}
return nil
}
func (a *Agent) Stop() error {
if !a.IsStop {
a.IsStop = true
// monitor进程直接退出
if option.Opt.EnableMonitorDaemon {
l.Info("agent daemon stopped")
return nil
}
l.Info("agent stop")
module.StopEveryModule()
}
return nil
}
func genVersion() string {
return ""
}
func initSvcCfg() error {
return nil
}
func startSvcModules() error {
// 启动插件管理器
if option.Opt.EnablePluginManagerModule {
if err := module.RegisterModule(plugin.New()); err != nil {
return fmt.Errorf("register module plugin_mgr err:%v", err)
}
}
// 启动资源监控
if option.Opt.EnableResourcesModule {
if err := module.RegisterModule(resources.New()); err != nil {
return fmt.Errorf("register module resources err:%v", err)
}
}
return nil
}
var (
preHadesLogModifyTime = time.Now()
preHadesLogRecordTime = time.Now()
)
func IsProcAbnormal() bool {
if finfo, err := os.Stat(filepath.Join(config.LogDir, config.LogFileName)); err == nil {
l.Debug("check proc alive, hades log file ModTime:", finfo.ModTime())
if !preHadesLogModifyTime.Equal(finfo.ModTime()) {
preHadesLogModifyTime = finfo.ModTime()
preHadesLogRecordTime = time.Now()
}
if time.Since(preHadesLogRecordTime) >= config.MonitorLogUnModDuration {
l.Error("check proc alive, hades log file has not been updated for a long time")
return true
}
} else {
l.Warn("check proc alive, hades log file stat error:", err)
}
return false
}
package global
const (
ResourceModuleName = "resources"
PluginModuleName = "pluginMgr"
)
package main
import (
"agent/cmd/agent/core"
"os"
"syscall"
"github.com/judwhite/go-svc"
"linkfog.com/public/lib/l"
)
func main() {
if err := svc.Run(core.AgentSvc, syscall.SIGINT, syscall.SIGTERM); err != nil {
l.Errorf("agent svc run err:%v, program exit", err)
l.Close()
os.Exit(1)
}
l.Info("agent svc run finished, program exit")
l.Close()
}
package option
import (
"flag"
"fmt"
"os"
"path/filepath"
"time"
"github.com/olekukonko/tablewriter"
)
var Opt options
type options struct {
Usage bool
PrintVersion bool
Debug bool
Verbose int
LogMaxSizeKB int // 日志单个文件磁盘空间占用最大值
LogRollNum int // 日志文件个数
EnableResourcesModule bool // 资源监控模块开关
// plugin
EnablePluginManagerModule bool
EnableValidatePluginMD5 bool
PluginStatusCheckTimes int // 插件状态检查次数
PluginStatusCheckDur time.Duration // 插件状态检查间隔
EnableMonitorDaemon bool // 监控守护进程
EnableProcAbnormalCb bool // 进程异常回调开关
MonitorDefaultMaxInterval time.Duration
MonitorDefaultIncrInterval time.Duration
MonitorDefaultMaxRetryTimes int
MonitorDefaultBackupFileName string
}
const (
ArgUsage = "usage"
ArgVersion = "v"
ArgDebug = "d"
ArgVerbose = "verbose"
ArgLogMaxSizeKB = "log-max-size-kb"
ArgLogRollNum = "log-roll-num"
ArgEnableResourcesModule = "enable-resources-module"
ArgEnablePluginManagerModule = "enable-plugin-manager"
ArgEnableValidatePluginMD5 = "enable-validate-plugin-md5"
ArgPluginStatusCheckTimes = "plugin-status-check-times"
ArgPluginStatusCheckDur = "plugin-status-check-dur"
ArgEnableMonitorDaemon = "enable-monitor-daemon"
ArgEnableProcAbnormalCb = "enable-proc-abnormal-cb"
ArgMonitorDefaultMaxInterval = "monitor-default-max-interval"
ArgMonitorDefaultIncrInterval = "monitor-default-incr-interval"
ArgMonitorDefaultMaxRetryTimes = "monitor-default-max-retry-times"
ArgMonitorDefaultBackupFileName = "monitor-default-backup-file-name"
)
func flagSet() {
flag.BoolVar(&Opt.Usage, ArgUsage, false, "custom format usage")
flag.BoolVar(&Opt.PrintVersion, ArgVersion, false, "print version and exit")
flag.BoolVar(&Opt.Debug, ArgDebug, false, "debug mode")
flag.IntVar(&Opt.Verbose, ArgVerbose, 0, "debug log verbose level")
flag.IntVar(&Opt.LogMaxSizeKB, ArgLogMaxSizeKB, 50000, "log file max size, unit is kb")
flag.IntVar(&Opt.LogRollNum, ArgLogRollNum, 5, "log file max number")
flag.BoolVar(&Opt.EnableResourcesModule, ArgEnableResourcesModule, true, "enable resources module")
flag.BoolVar(&Opt.EnablePluginManagerModule, ArgEnablePluginManagerModule, true, "enable plugin manager")
flag.BoolVar(&Opt.EnableValidatePluginMD5, ArgEnableValidatePluginMD5, false, "enable validate plugin md5")
flag.IntVar(&Opt.PluginStatusCheckTimes, ArgPluginStatusCheckTimes, 5, "plugin status check times")
flag.DurationVar(&Opt.PluginStatusCheckDur, ArgPluginStatusCheckDur, 10*time.Second, "plugin status check dur, unit is second")
flag.BoolVar(&Opt.EnableMonitorDaemon, ArgEnableMonitorDaemon, true, "monitor daemon mode")
flag.BoolVar(&Opt.EnableProcAbnormalCb, ArgEnableProcAbnormalCb, true, "enable proc abnormal callback")
flag.DurationVar(&Opt.MonitorDefaultMaxInterval, ArgMonitorDefaultMaxInterval, 1*time.Minute, "the maximum time interval for restarting a process")
flag.DurationVar(&Opt.MonitorDefaultIncrInterval, ArgMonitorDefaultIncrInterval, 12*time.Second, "every time the process is restarted, the time interval increases")
flag.IntVar(&Opt.MonitorDefaultMaxRetryTimes, ArgMonitorDefaultMaxRetryTimes, 3, "maximum number of error retries")
flag.StringVar(&Opt.MonitorDefaultBackupFileName, ArgMonitorDefaultBackupFileName, "agent.bak", "agent file backup")
}
func Parse() {
flagSet()
flag.Parse()
if Opt.Usage {
usage()
os.Exit(0)
}
if flag.NArg() != 0 {
fmt.Println("unexpected non-flag args:", flag.Args())
os.Exit(1)
}
}
func usage() {
_, err := fmt.Fprintf(os.Stderr, "Usage of %s:\n", filepath.Base(os.Args[0]))
if err != nil {
fmt.Println(err)
return
}
table := tablewriter.NewWriter(os.Stderr)
table.SetHeader([]string{"Name", "Default", "Usage"})
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
table.SetAutoWrapText(false)
table.SetAlignment(tablewriter.ALIGN_LEFT)
table.SetCenterSeparator("")
table.SetColumnSeparator("")
table.SetRowSeparator("")
table.SetHeaderLine(false)
table.SetBorder(false)
flag.VisitAll(func(f *flag.Flag) {
table.Append([]string{"-" + f.Name, f.DefValue, f.Usage})
})
table.Render()
}
module agent
go 1.23.2
replace linkfog.com/public => web.lueluesay.top/git/lil/public v0.0.0-20241014113926-be062ca04146
replace linkfog.com/pluginx => web.lueluesay.top/git/lil/pluginx v0.0.0-20241014064823-a6e286ccb5cd
require (
github.com/judwhite/go-svc v1.2.1
github.com/olekukonko/tablewriter v0.0.5
github.com/prometheus/procfs v0.15.1
github.com/shirou/gopsutil v3.21.11+incompatible
linkfog.com/pluginx v0.0.0-00010101000000-000000000000
linkfog.com/public v0.0.0-00010101000000-000000000000
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/judwhite/go-svc v1.2.1 h1:a7fsJzYUa33sfDJRF2N/WXhA+LonCEEY8BJb1tuS5tA=
github.com/judwhite/go-svc v1.2.1/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY=
github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYgY=
github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
web.lueluesay.top/git/lil/pluginx v0.0.0-20241014064823-a6e286ccb5cd h1:KaqJZ9MQkJ2mQDHXe9pm5MIgW5xpzNekj4OYSYn4cR4=
web.lueluesay.top/git/lil/pluginx v0.0.0-20241014064823-a6e286ccb5cd/go.mod h1:DL73qsfCIFfH2K6tdOLPj9tRcZ8AYxfPrkcTebYXr5o=
web.lueluesay.top/git/lil/public v0.0.0-20241014113926-be062ca04146 h1:Y9ZcWzfBrrRlS8LS/XkwURMeUfjGPQkjiokK6YG+2wI=
web.lueluesay.top/git/lil/public v0.0.0-20241014113926-be062ca04146/go.mod h1:x/nRP9pMRVToI9Te1TazybP0Qlj3V+/aA2EiPQEvzsI=
package module
import (
"fmt"
"time"
"linkfog.com/public/lib/l"
)
type Runnable interface {
Name() string
Start() error
Stop()
IsRunning() bool
}
var moduleTable = map[string]Runnable{}
func RegisterModule(mod Runnable) error {
name := mod.Name()
if _, ok := moduleTable[name]; ok {
l.Error("module already exists:", name)
return nil
}
moduleTable[name] = mod
if err := mod.Start(); err != nil {
return fmt.Errorf("regist module %s failed:%s", name, err)
}
l.Infof("register module %s success", name)
return nil
}
func UnRegisterModule(name string) {
l.Info("UnRegisterModule", name)
if _, ok := moduleTable[name]; !ok {
l.Debug("module not exists", name)
return
}
if moduleTable[name].IsRunning() {
moduleTable[name].Stop()
time.Sleep(5 * time.Second)
}
delete(moduleTable, name)
l.Info("UnRegisterModule success", name)
}
func IsRunning(name string) bool {
if m, ok := moduleTable[name]; ok {
return m.IsRunning()
}
return false
}
func StopEveryModule() {
for _, m := range moduleTable {
m.Stop()
}
// just give 3 second for every module to stop gracefully
time.Sleep(3 * time.Second)
}
package plugin
import (
"errors"
"time"
"agent/cmd/agent/config"
)
// 插件名
const (
HadesPlugin = "agent"
DobermanPlugin = "doberman"
)
// 插件函数
const (
ProcessPluginMsg = "processPluginMsg"
)
// 插件消息
const (
PluginOpenDebugMsg = "pluginOpenDebugMsg"
PluginCloseDebugMsg = "pluginCloseDebugMsg"
)
var (
HadesPluginSocket = config.PluginDir + "/" + HadesPlugin + ".sock"
DobermanPluginSocket = config.PluginDir + "/" + DobermanPlugin + ".sock"
DefaultCallTimeout = 5 * time.Second
ErrDisconnected = errors.New("doberman plugin disconnected")
)
type Msg struct {
PlgName string
PlgMsg CommMsg
}
type CommMsg struct {
MsgType string
}
package plugin
import (
"agent/cmd/agent/global"
"fmt"
"linkfog.com/public/lib/common"
"strings"
"sync"
"time"
"agent/cmd/agent/option"
"linkfog.com/pluginx/pluginmgr"
"linkfog.com/pluginx/pluginrpc"
"linkfog.com/public/lib/l"
)
var defaultPluginMgrConf string = `{
"doberman": {
"name": "doberman",
"path": "/dosec/plugin/doberman",
"enable": true,
"mem": 629145600,
"md5": "uninitialized"
}
}`
var defaultPluginCliConf = map[string]string{
DobermanPlugin: DobermanPluginSocket,
}
type Plugin struct {
signal chan string
signalSize int
isRunning bool
plgServer *pluginServer
plgConf map[string]*pluginmgr.PluginProcessConf
processMgr *pluginmgr.PluginProcessMgr
grpcServer *pluginrpc.PluginGrpcServer
grpcClient *pluginrpc.PluginGrpcClient
sync.Mutex
}
type PluginOpt func(*Plugin)
func New(opts ...PluginOpt) *Plugin {
p := Plugin{
signalSize: 10,
}
for _, opt := range opts {
opt(&p)
}
p.signal = make(chan string, p.signalSize)
return &p
}
func (p *Plugin) Start() error {
if p.IsRunning() {
return nil
}
l.Info("init plugin module")
//// 启动自身grpc服务
//p.plgServer = newPluginServer()
//var err error
//p.grpcServer, err = pluginrpc.NewPluginGrpcServer(HadesPluginSocket, p.plgServer)
//if err != nil {
// return fmt.Errorf("NewPluginGrpcServer err: %v", err)
//}
// 启动插件管理器,运行各插件进程
pluginmgr.EnableValidatePluginMD5 = option.Opt.EnableValidatePluginMD5
pluginmgr.InitDur = option.Opt.PluginStatusCheckDur
plgCfgMap, err := pluginmgr.LoadPluginConfigWithData([]byte(defaultPluginMgrConf))
if err != nil {
return fmt.Errorf("LoadPluginConfigWithData err: %v", err)
}
for name, plg := range plgCfgMap {
l.Info("plugin config:", name, plg)
}
p.plgConf = plgCfgMap
p.processMgr = pluginmgr.NewProcessMgr(p.plgConf)
p.processMgr.Start()
// 建立与各插件进程的grpc连接
p.grpcClient, err = pluginrpc.NewPluginGrpcClient(defaultPluginCliConf)
if err != nil {
return fmt.Errorf("NewGrpcClientHelper err: %v", err)
}
// 等待所有插件运行正常
allIsRunning := false
for i := 0; i < option.Opt.PluginStatusCheckTimes; i++ {
if p.processMgr.AllPluginProcessIsRunning() {
allIsRunning = true
break
}
l.Info("all plugin process are init ...")
time.Sleep(option.Opt.PluginStatusCheckDur)
}
if !allIsRunning {
return fmt.Errorf("plugin processes init failed")
}
go func() {
var sig string
var ok bool
for {
select {
case sig, ok = <-p.signal:
if !ok {
p.Stop()
l.Info("module resources exit")
return
} else {
if len(sig) < 120 {
l.Info("module resources receive signal:", sig)
} else {
l.Info("module resources receive signal:", sig[:120])
}
p.dealWithSig(sig)
}
}
}
}()
p.isRunning = true
l.Info("init plugin module success")
return nil
}
func (p *Plugin) Name() string {
return global.PluginModuleName
}
func (p *Plugin) IsRunning() bool {
return p.isRunning
}
func (p *Plugin) Stop() {
if p.IsRunning() {
if p.processMgr != nil {
p.processMgr.Stop()
}
if p.grpcClient != nil {
p.grpcClient.Close()
}
if p.grpcServer != nil {
p.grpcServer.Close()
}
close(p.signal)
p.isRunning = false
}
}
func (p *Plugin) Receive(msg string) error {
if len(p.signal) > (p.signalSize - 2) {
return l.WrapError("signal chan is full, drop msg:", msg)
}
p.signal <- msg
return nil
}
func (p *Plugin) dealWithSig(msgStr string) {
splits := strings.SplitN(msgStr, ":", 3)
if len(splits) != 3 {
l.Warn("bad msg fmt from server", msgStr)
return
}
msgType := splits[1]
defer common.TimeCost("deal sig finished, msgType:" + msgType)()
l.Info("ignore sig:", msgStr)
}
package plugin
import (
"context"
"linkfog.com/pluginx/pluginrpc"
pb "linkfog.com/pluginx/proto"
"linkfog.com/public/lib/l"
)
type pluginServer struct {
pb.UnimplementedPluginServer
}
func newPluginServer() *pluginServer {
s := &pluginServer{}
return s
}
func (s *pluginServer) Call(ctx context.Context, req *pb.Req) (*pb.Res, error) {
res := pluginrpc.NewRes(req, 0, "success", []byte("pong"))
return res, nil
}
func (s *pluginServer) SendFile(stream pb.Plugin_SendFileServer) error {
return pluginrpc.SendFileHelper(stream, SendFileHandle)
}
func SendFileHandle(fs *pb.FileStream, filePath string) {
l.Info("recv file header:", fs.Header)
l.Infof("recv file name:%s, purpose:%s, size:%d, path:%s", fs.Name, fs.Purpose, fs.TotalSize, filePath)
}
package resources
import (
"time"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/load"
)
type HostCPUStatesPct cpu.TimesStat
type HostCPUTimesStatWrap struct {
CPUTimesStats []cpu.TimesStat
Ts int64
}
func GetHostCPUCount() (int, error) {
return cpu.Counts(true)
}
func GetHostCPULoad() (*load.AvgStat, error) {
return load.Avg()
}
// GetHostCPUUsage calc now and last call within gopsutil/cpu
func GetHostCPUUsage() (float64, error) {
return GetHostCPUUsageWithInterval(0)
}
// GetHostCPUUsageWithInterval if d is 0, calc now and last call within gopsutil/cpu.
// if d is not 0, calc two call that need sleep within gopsutil/cpu.
func GetHostCPUUsageWithInterval(d time.Duration) (float64, error) {
var total float64
usages, err := cpu.Percent(d, false)
if err != nil {
return total, err
}
for _, usage := range usages {
total += usage
}
return total, nil
}
func GetHostCPUStatesWithInterval(d time.Duration) ([]HostCPUStatesPct, error) {
states := make([]HostCPUStatesPct, 0)
pre, err := cpu.Times(false)
if err != nil {
return states, err
}
preWrap := HostCPUTimesStatWrap{CPUTimesStats: pre, Ts: time.Now().Unix()}
time.Sleep(d)
now, err := cpu.Times(false)
if err != nil {
return states, err
}
nowWrap := HostCPUTimesStatWrap{CPUTimesStats: now, Ts: time.Now().Unix()}
return calculateCPUStates(&preWrap, &nowWrap), nil
}
func GetHostCPUStatesWithPre(preWrap *HostCPUTimesStatWrap) (
[]HostCPUStatesPct, *HostCPUTimesStatWrap, error) {
states := make([]HostCPUStatesPct, 0)
now, err := cpu.Times(false)
if err != nil {
return states, nil, err
}
nowWrap := HostCPUTimesStatWrap{CPUTimesStats: now, Ts: time.Now().Unix()}
if preWrap == nil {
return calculateCPUStates(&nowWrap, &nowWrap), &nowWrap, nil
}
return calculateCPUStates(preWrap, &nowWrap), &nowWrap, nil
}
func calculateCPUStates(preWrap, nowWrap *HostCPUTimesStatWrap) []HostCPUStatesPct {
states := make([]HostCPUStatesPct, 0)
duration := nowWrap.Ts - preWrap.Ts
for _, now := range nowWrap.CPUTimesStats {
state := HostCPUStatesPct{
CPU: now.CPU,
}
for _, pre := range preWrap.CPUTimesStats {
if pre.CPU == now.CPU {
state.User = calcPct(pre.User, now.User, float64(duration))
state.System = calcPct(pre.System, now.System, float64(duration))
state.Nice = calcPct(pre.Nice, now.Nice, float64(duration))
state.Idle = calcPct(pre.Idle, now.Idle, float64(duration))
state.Iowait = calcPct(pre.Iowait, now.Iowait, float64(duration))
state.Irq = calcPct(pre.Irq, now.Irq, float64(duration))
state.Softirq = calcPct(pre.Softirq, now.Softirq, float64(duration))
state.Steal = calcPct(pre.Steal, now.Steal, float64(duration))
state.Guest = calcPct(pre.Guest, now.Guest, float64(duration))
state.GuestNice = calcPct(pre.GuestNice, now.GuestNice, float64(duration))
break
}
}
states = append(states, state)
}
return states
}
func calcPct(pre, now, second float64) float64 {
diff := now - pre
if diff > 0 && second > 0 {
return (diff / second) * 100
}
return 0
}
package resources
import (
"sort"
"time"
"github.com/shirou/gopsutil/disk"
)
type HostDiskIOUsage struct {
Device string
ReadBytes uint64
WriteBytes uint64
ReadUsage uint64
WriteUsage uint64
}
type HostDiskIOUsageList []HostDiskIOUsage
func (e HostDiskIOUsageList) Len() int {
return len(e)
}
func (e HostDiskIOUsageList) Less(i, j int) bool {
return e[i].Device < e[j].Device
}
func (e HostDiskIOUsageList) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
type HostDiskIOCountersStatWrap struct {
IOStat map[string]disk.IOCountersStat
Ts int64
}
func GetHostDiskIOUsageWithInterval(d time.Duration) ([]HostDiskIOUsage, error) {
usages := make([]HostDiskIOUsage, 0)
pre, err := disk.IOCounters()
if err != nil {
return usages, err
}
preWrap := HostDiskIOCountersStatWrap{IOStat: pre, Ts: time.Now().Unix()}
time.Sleep(d)
now, err := disk.IOCounters()
if err != nil {
return usages, err
}
nowWrap := HostDiskIOCountersStatWrap{IOStat: now, Ts: time.Now().Unix()}
return calculateDiskIOUsage(&preWrap, &nowWrap), nil
}
func GetHostDiskIOUsageWithPre(preWrap *HostDiskIOCountersStatWrap) (
[]HostDiskIOUsage, *HostDiskIOCountersStatWrap, error) {
usages := make([]HostDiskIOUsage, 0)
now, err := disk.IOCounters()
if err != nil {
return usages, nil, err
}
nowWrap := HostDiskIOCountersStatWrap{IOStat: now, Ts: time.Now().Unix()}
if preWrap == nil {
return calculateDiskIOUsage(&nowWrap, &nowWrap), &nowWrap, nil
}
return calculateDiskIOUsage(preWrap, &nowWrap), &nowWrap, nil
}
func calculateDiskIOUsage(preWrap, nowWrap *HostDiskIOCountersStatWrap) []HostDiskIOUsage {
usages := make([]HostDiskIOUsage, 0)
duration := nowWrap.Ts - preWrap.Ts
for dev, nowStat := range nowWrap.IOStat {
usage := HostDiskIOUsage{
Device: dev,
ReadBytes: nowStat.ReadBytes,
WriteBytes: nowStat.WriteBytes,
}
if preStat, ok := preWrap.IOStat[dev]; ok {
rdiff := (nowStat.ReadBytes - preStat.ReadBytes)
if rdiff > 0 && duration > 0 {
usage.ReadUsage = rdiff / uint64(duration)
}
wdiff := (nowStat.WriteBytes - preStat.WriteBytes)
if wdiff > 0 && duration > 0 {
usage.WriteUsage = wdiff / uint64(duration)
}
}
usages = append(usages, usage)
}
sort.Sort(HostDiskIOUsageList(usages))
return usages
}
package resources
import (
"sort"
"github.com/shirou/gopsutil/disk"
)
type HostDiskSpaceUsage struct {
PartitionStat disk.PartitionStat
UsageStat *disk.UsageStat
}
type HostDiskSpaceUsageList []HostDiskSpaceUsage
func (e HostDiskSpaceUsageList) Len() int {
return len(e)
}
func (e HostDiskSpaceUsageList) Less(i, j int) bool {
return e[i].PartitionStat.Device < e[j].PartitionStat.Device
}
func (e HostDiskSpaceUsageList) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
// func GetDockerRootDiskSpaceUsage() (*disk.UsageStat, error) {
// dockerInfo, err := containerRuntime.New().GetInfo()
// if err != nil {
// return nil, err
// }
//
// diskSpaceUsage, err := disk.Usage(dockerInfo.DockerRootDir)
// if err != nil {
// return nil, err
// }
//
// return diskSpaceUsage, nil
// }
func GetHostDiskSpaceUsage() ([]HostDiskSpaceUsage, error) {
diskSpaceUsages := make([]HostDiskSpaceUsage, 0)
partitionStats, err := disk.Partitions(false)
if err != nil {
return diskSpaceUsages, err
}
for _, partitionStat := range partitionStats {
usageStat, err := disk.Usage(partitionStat.Mountpoint)
if err != nil {
return diskSpaceUsages, err
}
item := HostDiskSpaceUsage{
PartitionStat: partitionStat,
UsageStat: usageStat,
}
diskSpaceUsages = append(diskSpaceUsages, item)
}
sort.Sort(HostDiskSpaceUsageList(diskSpaceUsages))
return diskSpaceUsages, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment