package pluginmgr import ( "sync" "time" "linkfog.com/public/lib/l" ) type PluginProcessMgr struct { PluginProcessConfMap map[string]*PluginProcessConf pluginProcessMap map[string]*pluginProcess mu *sync.Mutex // for PluginProcessConfMap, pluginProcessMap done chan struct{} doneResp chan struct{} isRunning bool sync.Mutex // for Start, Stop } func NewProcessMgr(conf map[string]*PluginProcessConf) *PluginProcessMgr { mgr := &PluginProcessMgr{ PluginProcessConfMap: conf, pluginProcessMap: make(map[string]*pluginProcess), mu: &sync.Mutex{}, } return mgr } func (mgr *PluginProcessMgr) Start() { mgr.Lock() defer mgr.Unlock() if mgr.isRunning { return } mgr.isRunning = true mgr.done = make(chan struct{}) mgr.doneResp = make(chan struct{}) l.Info("plugin process mgr start") go mgr.run() } func (mgr *PluginProcessMgr) Stop() { mgr.Lock() defer mgr.Unlock() if !mgr.isRunning { return } start := time.Now() mgr.isRunning = false close(mgr.done) <-mgr.doneResp mgr.mu.Lock() defer mgr.mu.Unlock() mgr.unregisterAllProcess() mgr.PluginProcessConfMap = make(map[string]*PluginProcessConf) mgr.pluginProcessMap = make(map[string]*pluginProcess) l.Infof("plugin process mgr has stopped, cost %v", time.Since(start)) } func (mgr *PluginProcessMgr) run() { mgr.detection() ticker := time.NewTicker(InitDur) defer ticker.Stop() defer close(mgr.doneResp) for { select { case <-ticker.C: mgr.detection() case <-mgr.done: l.Info("plugin process mgr received end signal") return } } } func (mgr *PluginProcessMgr) detection() { mgr.mu.Lock() defer mgr.mu.Unlock() // 基于进程map和配置map,清除无用的插件进程 for name, p := range mgr.pluginProcessMap { if _, ok := mgr.PluginProcessConfMap[name]; !ok { l.Infof("unregister useless plugin process %s", name) mgr.unregisterProcess(name, p) } } // 基于配置map和进程map,确保进程状态与配置完全同步 for _, conf := range mgr.PluginProcessConfMap { if !conf.Enable { // 插件进程不可用,但进程存在,需要关闭 if p, ok := mgr.pluginProcessMap[conf.Name]; ok { l.Infof("plugin process %s disable, unregister", conf.Name) mgr.unregisterProcess(conf.Name, p) } continue } // 插件进程不存在,启动 p, ok := mgr.pluginProcessMap[conf.Name] if !ok { l.Infof("register plugin process %s", conf.Name) mgr.registerProcess(conf) continue } // 插件进程异常退出,重启 if p.isExited() { l.Warnf("plugin process %s abnormal exited, restart", conf.Name) mgr.registerProcess(conf) continue } // 获取插件进程内存rss rss, err := GetProcRSS(p.pid()) if err != nil { l.Warnf("get plugin process %s memory rss err: %v", conf.Name, err) continue } l.Infof("plugin process %s memory rss: %d", conf.Name, rss) // 插件进程内存rss达到限制,重启 if rss > conf.Mem { l.Warn("plugin process %s memory rss(%d) reach limit(%d), restart", conf.Name, rss, conf.Mem) p.shutdown() mgr.registerProcess(conf) } } } func (mgr *PluginProcessMgr) ReloadConf(conf map[string]*PluginProcessConf) { mgr.mu.Lock() defer mgr.mu.Unlock() mgr.PluginProcessConfMap = conf } func (mgr *PluginProcessMgr) registerProcess(conf *PluginProcessConf) { p, err := newProcess(conf) if err != nil { l.Errorf("new plugin process %s err: %v", conf.Name, err) return } mgr.pluginProcessMap[conf.Name] = p } func (mgr *PluginProcessMgr) unregisterProcess(name string, p *pluginProcess) { p.shutdown() if !p.isExited() { l.Error("shutdown plugin process failed") return } delete(mgr.pluginProcessMap, name) } func (mgr *PluginProcessMgr) unregisterAllProcess() { l.Infof("shutdown all plugin process") for name, p := range mgr.pluginProcessMap { l.Infof("shutdown plugin process %s", name) mgr.unregisterProcess(name, p) } } func (mgr *PluginProcessMgr) AllPluginProcessIsRunning() bool { for _, p := range mgr.pluginProcessMap { if p.isExited() { return false } } return true }