package plugin import ( "fmt" "sync" "time" "agent/cmd/agent/global" "agent/cmd/agent/option" "linkfog.com/pluginx/pluginmgr" "linkfog.com/pluginx/pluginrpc" "linkfog.com/public/lib/common" "linkfog.com/public/lib/l" ) var defaultPluginCliConf = make(map[string]string) type Plugin struct { signal chan *global.Message signalSize int isRunning bool plgServer *pluginServer plgConf map[string]*pluginmgr.PluginProcessConf processMgr *pluginmgr.PluginProcessMgr grpcServer *pluginrpc.PluginGrpcServer grpcClient *pluginrpc.PluginGrpcClient sync.Mutex } type PluginOpt func(*Plugin) func New(opts ...PluginOpt) *Plugin { p := Plugin{ signalSize: 512, } for _, opt := range opts { opt(&p) } setPluginSocketPath() p.signal = make(chan *global.Message, p.signalSize) return &p } func (p *Plugin) Start() error { if p.IsRunning() { return nil } l.Info("init plugin module") //// 启动自身grpc服务 //p.plgServer = newPluginServer() //var err error //p.grpcServer, err = pluginrpc.NewPluginGrpcServer(HadesPluginSocket, p.plgServer) //if err != nil { // return fmt.Errorf("NewPluginGrpcServer err: %v", err) //} // 启动插件管理器,运行各插件进程 pluginmgr.EnableValidatePluginMD5 = option.Opt.EnableValidatePluginMD5 pluginmgr.InitDur = option.Opt.PluginStatusCheckDur plgCfgMap, err := pluginmgr.LoadPluginConfigWithFile(defaultPluginMgrConfFile) if err != nil { l.Warnf("load plugin config file failed, %v", err) plgCfgMap = make(map[string]*pluginmgr.PluginProcessConf) } for name, plg := range plgCfgMap { if name == PushStreamingPlugin { defaultPluginCliConf[name] = PushStreamingPluginSocket } else if name == ReportDCSInfoPlugin { defaultPluginCliConf[name] = ReportDCSInfoPluginSocket } l.Info("plugin config:", name, plg) } p.plgConf = plgCfgMap p.processMgr = pluginmgr.NewProcessMgr(p.plgConf) p.processMgr.Start() // 建立与各插件进程的grpc连接 p.grpcClient, err = pluginrpc.New() if err != nil { return fmt.Errorf("NewGrpcClientHelper err: %v", err) } if len(defaultPluginCliConf) > 0 { err := p.grpcClient.NewPluginClient(defaultPluginCliConf) if err != nil { l.Error(err) return err } } // 等待所有插件运行正常 allIsRunning := false for i := 0; i < option.Opt.PluginStatusCheckTimes; i++ { if p.processMgr.AllPluginProcessIsRunning() { allIsRunning = true break } l.Info("all plugin process are init ...") time.Sleep(option.Opt.PluginStatusCheckDur) } if !allIsRunning { return fmt.Errorf("plugin processes init failed") } go func() { var sig *global.Message var ok bool for { select { case sig, ok = <-p.signal: if !ok { p.Stop() l.Info("module resources exit") return } else { p.dealWithSig(sig) } } } }() p.isRunning = true l.Info("init plugin module success") return nil } func (p *Plugin) Name() string { return global.PluginModuleName } func (p *Plugin) IsRunning() bool { return p.isRunning } func (p *Plugin) Stop() { if p.IsRunning() { if p.processMgr != nil { p.processMgr.Stop() } if p.grpcClient != nil { p.grpcClient.Close() } if p.grpcServer != nil { p.grpcServer.Close() } close(p.signal) p.isRunning = false } } func (p *Plugin) Receive(msg *global.Message) error { if len(p.signal) > (p.signalSize - 2) { return l.WrapError("plugin manager signal chan is full, drop msg:", msg.Key) } p.signal <- msg return nil } func (p *Plugin) dealWithSig(msg *global.Message) { defer common.TimeCost("deal sig finished, msgType:" + msg.Key)() switch msg.Key { case global.ConsumerTopicPluginUpgrade: case global.ConsumerTopicStartupPlugin: go chatWithPlugin(p.grpcClient, ReportDCSInfoPlugin, global.PublishTopicReportInfo) case global.ConsumerTopicStopPlugin: } }