package plugin import ( "agent/cmd/agent/config" "agent/pkg/file" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" "agent/cmd/agent/global" "agent/cmd/agent/option" "linkfog.com/pluginx/pluginmgr" "linkfog.com/pluginx/pluginrpc" "linkfog.com/public/lib/common" pFile "linkfog.com/public/lib/file" "linkfog.com/public/lib/l" ) var defaultPluginCliConf = make(map[string]string) type Plugin struct { signal chan *global.Message signalSize int isRunning bool plgServer *pluginServer plgConf map[string]*pluginmgr.PluginProcessConf processMgr *pluginmgr.PluginProcessMgr grpcServer *pluginrpc.PluginGrpcServer grpcClient *pluginrpc.PluginGrpcClient sync.Mutex } type PluginOpt func(*Plugin) func New(opts ...PluginOpt) *Plugin { p := Plugin{ signalSize: 512, } for _, opt := range opts { opt(&p) } setPluginSocketPath() p.signal = make(chan *global.Message, p.signalSize) return &p } func (p *Plugin) Start() error { if p.IsRunning() { return nil } l.Info("init plugin module") //// 启动自身grpc服务 //p.plgServer = newPluginServer() //var err error //p.grpcServer, err = pluginrpc.NewPluginGrpcServer(HadesPluginSocket, p.plgServer) //if err != nil { // return fmt.Errorf("NewPluginGrpcServer err: %v", err) //} // 启动插件管理器,运行各插件进程 pluginmgr.EnableValidatePluginMD5 = option.Opt.EnableValidatePluginMD5 pluginmgr.InitDur = option.Opt.PluginStatusCheckDur plgCfgMap, err := pluginmgr.LoadPluginConfigWithFile(defaultPluginMgrConfFile) if err != nil { l.Warnf("load plugin config file failed, %v", err) plgCfgMap = make(map[string]*pluginmgr.PluginProcessConf) } for name, plg := range plgCfgMap { if name == PushStreamingPlugin { defaultPluginCliConf[name] = PushStreamingPluginSocket } else if name == ReportDCSInfoPlugin { defaultPluginCliConf[name] = ReportDCSInfoPluginSocket } l.Info("plugin config:", name, plg) } p.plgConf = plgCfgMap p.processMgr = pluginmgr.NewProcessMgr(p.plgConf) p.processMgr.Start() // 建立与各插件进程的grpc连接 p.grpcClient, err = pluginrpc.New() if err != nil { return fmt.Errorf("NewGrpcClientHelper err: %v", err) } if len(defaultPluginCliConf) > 0 { err := p.grpcClient.NewPluginClient(defaultPluginCliConf) if err != nil { l.Error(err) return err } } // 等待所有插件运行正常 allIsRunning := false for i := 0; i < option.Opt.PluginStatusCheckTimes; i++ { if p.processMgr.AllPluginProcessIsRunning() { allIsRunning = true break } l.Info("all plugin process are init ...") time.Sleep(option.Opt.PluginStatusCheckDur) } if !allIsRunning { return fmt.Errorf("plugin processes init failed") } go func() { var sig *global.Message var ok bool for { select { case sig, ok = <-p.signal: if !ok { p.Stop() l.Info("module resources exit") return } else { p.dealWithSig(sig) } } } }() p.isRunning = true l.Info("init plugin module success") return nil } func (p *Plugin) Name() string { return global.PluginModuleName } func (p *Plugin) IsRunning() bool { return p.isRunning } func (p *Plugin) Stop() { if p.IsRunning() { if p.processMgr != nil { p.processMgr.Stop() } if p.grpcClient != nil { p.grpcClient.Close() } if p.grpcServer != nil { p.grpcServer.Close() } close(p.signal) p.isRunning = false } } type pluginInfo struct { Name string URL string MD5 string } func (p *Plugin) Receive(msg *global.Message) error { if len(p.signal) > (p.signalSize - 2) { return l.WrapError("plugin manager signal chan is full, drop msg:", msg.Key) } p.signal <- msg return nil } func (p *Plugin) dealWithSig(msg *global.Message) { defer common.TimeCost("deal sig finished, msgType:" + msg.Key)() switch msg.Key { case global.ConsumerTopicPluginUpgrade: // TODO: 更新插件版本 case global.ConsumerTopicStartupPlugin: info := &pluginInfo{} err := json.Unmarshal([]byte(msg.Payload), &info) if err != nil { l.Error(err) return } // 首次启动,该不存在文件 if _, err = os.Stat(filepath.Join(config.PluginDir, info.Name)); err != nil { downloadPlugin(info.URL, config.PluginDir, info.Name) } else { md5, err := pFile.GetFileMD5(filepath.Join(config.PluginDir, info.Name)) if err != nil { l.Warn(err) } if md5 != info.MD5 { // TODO: 备份 // 下载 downloadPlugin(info.URL, config.PluginDir, info.Name) } } p.processMgr.ChangeSinglePluginConf(&pluginmgr.PluginProcessConf{ Name: info.Name, Path: filepath.Join(config.PluginDir, info.Name), Enable: true, }) // 等待一秒再建立连接 time.Sleep(1 * time.Second) err = p.grpcClient.NewPluginClient(map[string]string{ info.Name: filepath.Join(config.PluginDir, info.Name+".sock"), }) if err != nil { l.Error(err) return } if info.Name == ReportDCSInfoPlugin { go chatWithPlugin(p.grpcClient, ReportDCSInfoPlugin, global.PublishTopicReportInfo) } case global.ConsumerTopicStopPlugin: info := &pluginInfo{} err := json.Unmarshal([]byte(msg.Payload), &info) if err != nil { l.Error(err) return } p.processMgr.ChangeSinglePluginConf(&pluginmgr.PluginProcessConf{ Name: info.Name, Path: filepath.Join(config.PluginDir, info.Name), Enable: false, }) // grpc连接,server被kill掉,连接自动断开,故客户端不用主动关闭 } } func downloadPlugin(url, dir, name string) { err := global.DownloadFile(url, dir, name) if err != nil { l.Errorf("download file err:%v", err) return } // 增加执行权限 err = file.SetFileExecPerm(filepath.Join(dir, name)) if err != nil { l.Error(err) } // TODO md5校验 }