plugin_proc_mgr.go 3.92 KB
Newer Older
“李磊”'s avatar
“李磊” committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package pluginmgr

import (
	"sync"
	"time"

	"linkfog.com/public/lib/l"
)

type PluginProcessMgr struct {
	PluginProcessConfMap map[string]*PluginProcessConf
	pluginProcessMap     map[string]*pluginProcess
	mu                   *sync.Mutex // for PluginProcessConfMap, pluginProcessMap
	done                 chan struct{}
	doneResp             chan struct{}
	isRunning            bool
	sync.Mutex           // for Start, Stop
}

func NewProcessMgr(conf map[string]*PluginProcessConf) *PluginProcessMgr {
	mgr := &PluginProcessMgr{
		PluginProcessConfMap: conf,
		pluginProcessMap:     make(map[string]*pluginProcess),
		mu:                   &sync.Mutex{},
	}
	return mgr
}

func (mgr *PluginProcessMgr) Start() {
	mgr.Lock()
	defer mgr.Unlock()
	if mgr.isRunning {
		return
	}

	mgr.isRunning = true
	mgr.done = make(chan struct{})
	mgr.doneResp = make(chan struct{})

	l.Info("plugin process mgr start")
	go mgr.run()
}

func (mgr *PluginProcessMgr) Stop() {
	mgr.Lock()
	defer mgr.Unlock()
	if !mgr.isRunning {
		return
	}

	start := time.Now()
	mgr.isRunning = false
	close(mgr.done)
	<-mgr.doneResp

	mgr.mu.Lock()
	defer mgr.mu.Unlock()
	mgr.unregisterAllProcess()
	mgr.PluginProcessConfMap = make(map[string]*PluginProcessConf)
	mgr.pluginProcessMap = make(map[string]*pluginProcess)
	l.Infof("plugin process mgr has stopped, cost %v", time.Since(start))
}

func (mgr *PluginProcessMgr) run() {
	mgr.detection()
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()
	defer close(mgr.doneResp)
	for {
		select {
		case <-ticker.C:
			mgr.detection()
		case <-mgr.done:
			l.Info("plugin process mgr received end signal")
			return
		}
	}
}

func (mgr *PluginProcessMgr) detection() {
	mgr.mu.Lock()
	defer mgr.mu.Unlock()

	// 基于进程map和配置map,清除无用的插件进程
	for name, p := range mgr.pluginProcessMap {
		if _, ok := mgr.PluginProcessConfMap[name]; !ok {
			l.Infof("unregister useless plugin process %s", name)
			mgr.unregisterProcess(name, p)
		}
	}

	// 基于配置map和进程map,确保进程状态与配置完全同步
	for _, conf := range mgr.PluginProcessConfMap {
		if !conf.Enable {
			// 插件进程不可用,但进程存在,需要关闭
			if p, ok := mgr.pluginProcessMap[conf.Name]; ok {
				l.Infof("plugin process %s disable, unregister", conf.Name)
				mgr.unregisterProcess(conf.Name, p)
			}
			continue
		}

		// 插件进程不存在,启动
		p, ok := mgr.pluginProcessMap[conf.Name]
		if !ok {
			l.Infof("register plugin process %s", conf.Name)
			mgr.registerProcess(conf)
			continue
		}

		// 插件进程异常退出,重启
		if p.isExited() {
			l.Warnf("plugin process %s abnormal exited, restart", conf.Name)
			mgr.registerProcess(conf)
			continue
		}

		// 获取插件进程内存rss
		rss, err := GetProcRSS(p.pid())
		if err != nil {
			l.Warnf("get plugin process %s memory rss err: %v", conf.Name, err)
			continue
		}
		l.Infof("plugin process %s memory rss: %d", conf.Name, rss)

		// 插件进程内存rss达到限制,重启
		if rss > conf.Mem {
			l.Warn("plugin process %s memory rss(%d) reach limit(%d), restart", conf.Name, rss, conf.Mem)
			p.shutdown()
			mgr.registerProcess(conf)
		}
	}
}

func (mgr *PluginProcessMgr) ReloadConf(conf map[string]*PluginProcessConf) {
	mgr.mu.Lock()
	defer mgr.mu.Unlock()
	mgr.PluginProcessConfMap = conf
}

func (mgr *PluginProcessMgr) registerProcess(conf *PluginProcessConf) {
	p, err := newProcess(conf)
	if err != nil {
		l.Errorf("new plugin process %s err: %v", conf.Name, err)
		return
	}
	mgr.pluginProcessMap[conf.Name] = p
}

func (mgr *PluginProcessMgr) unregisterProcess(name string, p *pluginProcess) {
	p.shutdown()
	if !p.isExited() {
		l.Error("shutdown plugin process failed")
		return
	}
	delete(mgr.pluginProcessMap, name)
}

func (mgr *PluginProcessMgr) unregisterAllProcess() {
	l.Infof("shutdown all plugin process")
	for name, p := range mgr.pluginProcessMap {
		l.Infof("shutdown plugin process %s", name)
		mgr.unregisterProcess(name, p)
	}
}