plugin_proc_mgr.go 4.23 KB
Newer Older
“李磊”'s avatar
“李磊” committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package pluginmgr

import (
	"sync"
	"time"

	"linkfog.com/public/lib/l"
)

type PluginProcessMgr struct {
	PluginProcessConfMap map[string]*PluginProcessConf
	pluginProcessMap     map[string]*pluginProcess
	mu                   *sync.Mutex // for PluginProcessConfMap, pluginProcessMap
	done                 chan struct{}
	doneResp             chan struct{}
	isRunning            bool
	sync.Mutex           // for Start, Stop
}

func NewProcessMgr(conf map[string]*PluginProcessConf) *PluginProcessMgr {
	mgr := &PluginProcessMgr{
		PluginProcessConfMap: conf,
		pluginProcessMap:     make(map[string]*pluginProcess),
		mu:                   &sync.Mutex{},
	}
	return mgr
}

func (mgr *PluginProcessMgr) Start() {
	mgr.Lock()
	defer mgr.Unlock()
	if mgr.isRunning {
		return
	}

	mgr.isRunning = true
	mgr.done = make(chan struct{})
	mgr.doneResp = make(chan struct{})

	l.Info("plugin process mgr start")
	go mgr.run()
}

func (mgr *PluginProcessMgr) Stop() {
	mgr.Lock()
	defer mgr.Unlock()
	if !mgr.isRunning {
		return
	}

	start := time.Now()
	mgr.isRunning = false
	close(mgr.done)
	<-mgr.doneResp

	mgr.mu.Lock()
	defer mgr.mu.Unlock()
	mgr.unregisterAllProcess()
	mgr.PluginProcessConfMap = make(map[string]*PluginProcessConf)
	mgr.pluginProcessMap = make(map[string]*pluginProcess)
	l.Infof("plugin process mgr has stopped, cost %v", time.Since(start))
}

func (mgr *PluginProcessMgr) run() {
	mgr.detection()
66
	ticker := time.NewTicker(InitDur)
“李磊”'s avatar
“李磊” committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
	defer ticker.Stop()
	defer close(mgr.doneResp)
	for {
		select {
		case <-ticker.C:
			mgr.detection()
		case <-mgr.done:
			l.Info("plugin process mgr received end signal")
			return
		}
	}
}

func (mgr *PluginProcessMgr) detection() {
	mgr.mu.Lock()
	defer mgr.mu.Unlock()

	// 基于进程map和配置map,清除无用的插件进程
	for name, p := range mgr.pluginProcessMap {
		if _, ok := mgr.PluginProcessConfMap[name]; !ok {
			l.Infof("unregister useless plugin process %s", name)
			mgr.unregisterProcess(name, p)
		}
	}

	// 基于配置map和进程map,确保进程状态与配置完全同步
	for _, conf := range mgr.PluginProcessConfMap {
		if !conf.Enable {
			// 插件进程不可用,但进程存在,需要关闭
			if p, ok := mgr.pluginProcessMap[conf.Name]; ok {
				l.Infof("plugin process %s disable, unregister", conf.Name)
				mgr.unregisterProcess(conf.Name, p)
			}
			continue
		}

		// 插件进程不存在,启动
		p, ok := mgr.pluginProcessMap[conf.Name]
		if !ok {
			l.Infof("register plugin process %s", conf.Name)
			mgr.registerProcess(conf)
			continue
		}

		// 插件进程异常退出,重启
		if p.isExited() {
			l.Warnf("plugin process %s abnormal exited, restart", conf.Name)
			mgr.registerProcess(conf)
			continue
		}

		// 获取插件进程内存rss
		rss, err := GetProcRSS(p.pid())
		if err != nil {
			l.Warnf("get plugin process %s memory rss err: %v", conf.Name, err)
			continue
		}
		l.Infof("plugin process %s memory rss: %d", conf.Name, rss)

		// 插件进程内存rss达到限制,重启
		if rss > conf.Mem {
			l.Warn("plugin process %s memory rss(%d) reach limit(%d), restart", conf.Name, rss, conf.Mem)
			p.shutdown()
			mgr.registerProcess(conf)
		}
	}
}

func (mgr *PluginProcessMgr) ReloadConf(conf map[string]*PluginProcessConf) {
	mgr.mu.Lock()
	defer mgr.mu.Unlock()
	mgr.PluginProcessConfMap = conf
}

Lei Li's avatar
Lei Li committed
141
142
143
144
145
146
func (mgr *PluginProcessMgr) ChangeSinglePluginConf(conf *PluginProcessConf) {
	mgr.mu.Lock()
	defer mgr.mu.Unlock()
	mgr.PluginProcessConfMap[conf.Name] = conf
}

“李磊”'s avatar
“李磊” committed
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
func (mgr *PluginProcessMgr) registerProcess(conf *PluginProcessConf) {
	p, err := newProcess(conf)
	if err != nil {
		l.Errorf("new plugin process %s err: %v", conf.Name, err)
		return
	}
	mgr.pluginProcessMap[conf.Name] = p
}

func (mgr *PluginProcessMgr) unregisterProcess(name string, p *pluginProcess) {
	p.shutdown()
	if !p.isExited() {
		l.Error("shutdown plugin process failed")
		return
	}
	delete(mgr.pluginProcessMap, name)
}

func (mgr *PluginProcessMgr) unregisterAllProcess() {
	l.Infof("shutdown all plugin process")
	for name, p := range mgr.pluginProcessMap {
		l.Infof("shutdown plugin process %s", name)
		mgr.unregisterProcess(name, p)
	}
}
172
173
174
175
176
177
178
179
180
181

func (mgr *PluginProcessMgr) AllPluginProcessIsRunning() bool {
	for _, p := range mgr.pluginProcessMap {
		if p.isExited() {
			return false
		}
	}

	return true
}