Commit c99715d2 authored by Lei Li's avatar Lei Li
Browse files

feat: 插件管理器支持接收启动和关闭插件的处理

parent 2ee5c9f8
package plugin package plugin
import ( import (
"errors"
"path/filepath" "path/filepath"
"time" "time"
...@@ -30,10 +29,11 @@ var ( ...@@ -30,10 +29,11 @@ var (
PushStreamingPluginSocket = PushStreamingPlugin + ".sock" PushStreamingPluginSocket = PushStreamingPlugin + ".sock"
ReportDCSInfoPluginSocket = ReportDCSInfoPlugin + ".sock" ReportDCSInfoPluginSocket = ReportDCSInfoPlugin + ".sock"
PushStreamingPluginBinFile = ""
ReportDCSInfoPluginBinFile = ""
defaultPluginMgrConfFile = "plugin.conf" defaultPluginMgrConfFile = "plugin.conf"
DefaultCallTimeout = 5 * time.Second DefaultCallTimeout = 5 * time.Second
ErrDisconnected = errors.New("doberman plugin disconnected")
) )
type Msg struct { type Msg struct {
...@@ -49,4 +49,6 @@ func setPluginSocketPath() { ...@@ -49,4 +49,6 @@ func setPluginSocketPath() {
PushStreamingPluginSocket = filepath.Join(config.PluginDir, PushStreamingPluginSocket) PushStreamingPluginSocket = filepath.Join(config.PluginDir, PushStreamingPluginSocket)
ReportDCSInfoPluginSocket = filepath.Join(config.PluginDir, ReportDCSInfoPluginSocket) ReportDCSInfoPluginSocket = filepath.Join(config.PluginDir, ReportDCSInfoPluginSocket)
defaultPluginMgrConfFile = filepath.Join(config.PluginDir, defaultPluginMgrConfFile) defaultPluginMgrConfFile = filepath.Join(config.PluginDir, defaultPluginMgrConfFile)
PushStreamingPluginBinFile = filepath.Join(config.PluginDir, PushStreamingPlugin)
ReportDCSInfoPluginBinFile = filepath.Join(config.PluginDir, ReportDCSInfoPlugin)
} }
package plugin package plugin
import ( import (
"agent/cmd/agent/config"
"agent/pkg/file"
"encoding/json"
"fmt" "fmt"
"os"
"path/filepath"
"sync" "sync"
"time" "time"
...@@ -10,6 +15,7 @@ import ( ...@@ -10,6 +15,7 @@ import (
"linkfog.com/pluginx/pluginmgr" "linkfog.com/pluginx/pluginmgr"
"linkfog.com/pluginx/pluginrpc" "linkfog.com/pluginx/pluginrpc"
"linkfog.com/public/lib/common" "linkfog.com/public/lib/common"
pFile "linkfog.com/public/lib/file"
"linkfog.com/public/lib/l" "linkfog.com/public/lib/l"
) )
...@@ -150,6 +156,12 @@ func (p *Plugin) Stop() { ...@@ -150,6 +156,12 @@ func (p *Plugin) Stop() {
} }
} }
type pluginInfo struct {
Name string
URL string
MD5 string
}
func (p *Plugin) Receive(msg *global.Message) error { func (p *Plugin) Receive(msg *global.Message) error {
if len(p.signal) > (p.signalSize - 2) { if len(p.signal) > (p.signalSize - 2) {
return l.WrapError("plugin manager signal chan is full, drop msg:", msg.Key) return l.WrapError("plugin manager signal chan is full, drop msg:", msg.Key)
...@@ -163,9 +175,75 @@ func (p *Plugin) dealWithSig(msg *global.Message) { ...@@ -163,9 +175,75 @@ func (p *Plugin) dealWithSig(msg *global.Message) {
switch msg.Key { switch msg.Key {
case global.ConsumerTopicPluginUpgrade: case global.ConsumerTopicPluginUpgrade:
// TODO: 更新插件版本
case global.ConsumerTopicStartupPlugin: case global.ConsumerTopicStartupPlugin:
go chatWithPlugin(p.grpcClient, ReportDCSInfoPlugin, global.PublishTopicReportInfo) info := &pluginInfo{}
err := json.Unmarshal([]byte(msg.Payload), &info)
if err != nil {
l.Error(err)
return
}
// 首次启动,该不存在文件
if _, err = os.Stat(filepath.Join(config.PluginDir, info.Name)); err != nil {
downloadPlugin(info.URL, config.PluginDir, info.Name)
} else {
md5, err := pFile.GetFileMD5(filepath.Join(config.PluginDir, info.Name))
if err != nil {
l.Warn(err)
}
if md5 != info.MD5 {
// TODO: 备份
// 下载
downloadPlugin(info.URL, config.PluginDir, info.Name)
}
}
p.processMgr.ChangeSinglePluginConf(&pluginmgr.PluginProcessConf{
Name: info.Name,
Path: filepath.Join(config.PluginDir, info.Name),
Enable: true,
})
// 等待一秒再建立连接
time.Sleep(1 * time.Second)
err = p.grpcClient.NewPluginClient(map[string]string{
info.Name: filepath.Join(config.PluginDir, info.Name+".sock"),
})
if err != nil {
l.Error(err)
return
}
if info.Name == ReportDCSInfoPlugin {
go chatWithPlugin(p.grpcClient, ReportDCSInfoPlugin, global.PublishTopicReportInfo)
}
case global.ConsumerTopicStopPlugin: case global.ConsumerTopicStopPlugin:
info := &pluginInfo{}
err := json.Unmarshal([]byte(msg.Payload), &info)
if err != nil {
l.Error(err)
return
}
p.processMgr.ChangeSinglePluginConf(&pluginmgr.PluginProcessConf{
Name: info.Name,
Path: filepath.Join(config.PluginDir, info.Name),
Enable: false,
})
// grpc连接,server被kill掉,连接自动断开,故客户端不用主动关闭
}
}
func downloadPlugin(url, dir, name string) {
err := global.DownloadFile(url, dir, name)
if err != nil {
l.Errorf("download file err:%v", err)
return
}
// 增加执行权限
err = file.SetFileExecPerm(filepath.Join(dir, name))
if err != nil {
l.Error(err)
} }
// TODO md5校验
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment