Commit c9ec1358 authored by Lei Li's avatar Lei Li
Browse files

feat: 插件管理器支持增量处理

parent 175d0ee6
...@@ -4,7 +4,7 @@ go 1.23.2 ...@@ -4,7 +4,7 @@ go 1.23.2
replace linkfog.com/public => web.lueluesay.top/git/lil/public v0.0.0-20241015034013-544ad0ec734d replace linkfog.com/public => web.lueluesay.top/git/lil/public v0.0.0-20241015034013-544ad0ec734d
replace linkfog.com/pluginx => web.lueluesay.top/git/lil/pluginx v0.0.0-20241014064823-a6e286ccb5cd replace linkfog.com/pluginx => web.lueluesay.top/git/lil/pluginx v0.0.0-20241017031415-3651ccc20390
require ( require (
github.com/beevik/ntp v1.4.3 github.com/beevik/ntp v1.4.3
......
...@@ -58,7 +58,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN ...@@ -58,7 +58,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
web.lueluesay.top/git/lil/pluginx v0.0.0-20241014064823-a6e286ccb5cd h1:KaqJZ9MQkJ2mQDHXe9pm5MIgW5xpzNekj4OYSYn4cR4= web.lueluesay.top/git/lil/pluginx v0.0.0-20241017031415-3651ccc20390 h1:V3uBqmiRc4xlBWfHRDETMGHTvLF+3mOGIZH/E46XcNI=
web.lueluesay.top/git/lil/pluginx v0.0.0-20241014064823-a6e286ccb5cd/go.mod h1:DL73qsfCIFfH2K6tdOLPj9tRcZ8AYxfPrkcTebYXr5o= web.lueluesay.top/git/lil/pluginx v0.0.0-20241017031415-3651ccc20390/go.mod h1:DL73qsfCIFfH2K6tdOLPj9tRcZ8AYxfPrkcTebYXr5o=
web.lueluesay.top/git/lil/public v0.0.0-20241015034013-544ad0ec734d h1:vScdJPO4B87cVCZU26EddFpdQTDxTocD2RSLcoHwy5k= web.lueluesay.top/git/lil/public v0.0.0-20241015034013-544ad0ec734d h1:vScdJPO4B87cVCZU26EddFpdQTDxTocD2RSLcoHwy5k=
web.lueluesay.top/git/lil/public v0.0.0-20241015034013-544ad0ec734d/go.mod h1:x/nRP9pMRVToI9Te1TazybP0Qlj3V+/aA2EiPQEvzsI= web.lueluesay.top/git/lil/public v0.0.0-20241015034013-544ad0ec734d/go.mod h1:x/nRP9pMRVToI9Te1TazybP0Qlj3V+/aA2EiPQEvzsI=
...@@ -2,6 +2,7 @@ package plugin ...@@ -2,6 +2,7 @@ package plugin
import ( import (
"errors" "errors"
"path/filepath"
"time" "time"
"agent/cmd/agent/config" "agent/cmd/agent/config"
...@@ -9,8 +10,8 @@ import ( ...@@ -9,8 +10,8 @@ import (
// 插件名 // 插件名
const ( const (
HadesPlugin = "agent" PushStreamingPlugin = "pushStreaming"
DobermanPlugin = "doberman" ReportDCSInfoPlugin = "reportDCSInfo"
) )
// 插件函数 // 插件函数
...@@ -25,8 +26,10 @@ const ( ...@@ -25,8 +26,10 @@ const (
) )
var ( var (
HadesPluginSocket = config.PluginDir + "/" + HadesPlugin + ".sock" PushStreamingPluginSocket = PushStreamingPlugin + ".sock"
DobermanPluginSocket = config.PluginDir + "/" + DobermanPlugin + ".sock" ReportDCSInfoPluginSocket = ReportDCSInfoPlugin + ".sock"
defaultPluginMgrConfFile = "plugin.conf"
DefaultCallTimeout = 5 * time.Second DefaultCallTimeout = 5 * time.Second
ErrDisconnected = errors.New("doberman plugin disconnected") ErrDisconnected = errors.New("doberman plugin disconnected")
...@@ -40,3 +43,9 @@ type Msg struct { ...@@ -40,3 +43,9 @@ type Msg struct {
type CommMsg struct { type CommMsg struct {
MsgType string MsgType string
} }
func setPluginSocketPath() {
PushStreamingPluginSocket = filepath.Join(config.PluginDir, PushStreamingPluginSocket)
ReportDCSInfoPluginSocket = filepath.Join(config.PluginDir, ReportDCSInfoPluginSocket)
defaultPluginMgrConfFile = filepath.Join(config.PluginDir, defaultPluginMgrConfFile)
}
package plugin package plugin
import ( import (
"agent/cmd/agent/global"
"fmt" "fmt"
"linkfog.com/public/lib/common"
"sync" "sync"
"time" "time"
"agent/cmd/agent/global"
"agent/cmd/agent/option" "agent/cmd/agent/option"
"linkfog.com/pluginx/pluginmgr" "linkfog.com/pluginx/pluginmgr"
"linkfog.com/pluginx/pluginrpc" "linkfog.com/pluginx/pluginrpc"
"linkfog.com/public/lib/common"
"linkfog.com/public/lib/l" "linkfog.com/public/lib/l"
) )
var defaultPluginMgrConf string = `{ var defaultPluginCliConf = make(map[string]string)
"doberman": {
"name": "doberman",
"path": "/dosec/plugin/doberman",
"enable": true,
"mem": 629145600,
"md5": "uninitialized"
}
}`
var defaultPluginCliConf = map[string]string{
DobermanPlugin: DobermanPluginSocket,
}
type Plugin struct { type Plugin struct {
signal chan *global.Message signal chan *global.Message
...@@ -48,6 +36,7 @@ func New(opts ...PluginOpt) *Plugin { ...@@ -48,6 +36,7 @@ func New(opts ...PluginOpt) *Plugin {
for _, opt := range opts { for _, opt := range opts {
opt(&p) opt(&p)
} }
setPluginSocketPath()
p.signal = make(chan *global.Message, p.signalSize) p.signal = make(chan *global.Message, p.signalSize)
return &p return &p
...@@ -70,11 +59,19 @@ func (p *Plugin) Start() error { ...@@ -70,11 +59,19 @@ func (p *Plugin) Start() error {
// 启动插件管理器,运行各插件进程 // 启动插件管理器,运行各插件进程
pluginmgr.EnableValidatePluginMD5 = option.Opt.EnableValidatePluginMD5 pluginmgr.EnableValidatePluginMD5 = option.Opt.EnableValidatePluginMD5
pluginmgr.InitDur = option.Opt.PluginStatusCheckDur pluginmgr.InitDur = option.Opt.PluginStatusCheckDur
plgCfgMap, err := pluginmgr.LoadPluginConfigWithData([]byte(defaultPluginMgrConf)) plgCfgMap, err := pluginmgr.LoadPluginConfigWithFile(defaultPluginMgrConfFile)
if err != nil { if err != nil {
return fmt.Errorf("LoadPluginConfigWithData err: %v", err) l.Warnf("load plugin config file failed, %v", err)
plgCfgMap = make(map[string]*pluginmgr.PluginProcessConf)
} }
for name, plg := range plgCfgMap { for name, plg := range plgCfgMap {
if name == PushStreamingPlugin {
defaultPluginCliConf[name] = PushStreamingPluginSocket
} else if name == ReportDCSInfoPlugin {
defaultPluginCliConf[name] = ReportDCSInfoPluginSocket
}
l.Info("plugin config:", name, plg) l.Info("plugin config:", name, plg)
} }
p.plgConf = plgCfgMap p.plgConf = plgCfgMap
...@@ -82,10 +79,17 @@ func (p *Plugin) Start() error { ...@@ -82,10 +79,17 @@ func (p *Plugin) Start() error {
p.processMgr.Start() p.processMgr.Start()
// 建立与各插件进程的grpc连接 // 建立与各插件进程的grpc连接
p.grpcClient, err = pluginrpc.NewPluginGrpcClient(defaultPluginCliConf) p.grpcClient, err = pluginrpc.New()
if err != nil { if err != nil {
return fmt.Errorf("NewGrpcClientHelper err: %v", err) return fmt.Errorf("NewGrpcClientHelper err: %v", err)
} }
if len(defaultPluginCliConf) > 0 {
err := p.grpcClient.NewPluginClient(defaultPluginCliConf)
if err != nil {
l.Error(err)
return err
}
}
// 等待所有插件运行正常 // 等待所有插件运行正常
allIsRunning := false allIsRunning := false
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment