monitor_daemon.go 13.5 KB
Newer Older
“李磊”'s avatar
“李磊” committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package monitor_daemon

import (
	"bufio"
	"errors"
	"fmt"
	"io"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"strings"
	"syscall"
	"time"

	"linkfog.com/public/lib/common"
Lei Li's avatar
Lei Li committed
17
	"linkfog.com/public/lib/file"
“李磊”'s avatar
“李磊” committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
	"linkfog.com/public/lib/l"
)

const (
	defaultTimerInterval time.Duration = 60 * time.Second
	minTimerInterval     time.Duration = 10 * time.Second
)

type IsProcAbnormalCallback func() bool
type ForwardSignalType int

const (
	NoForwardSignal         ForwardSignalType = iota //不转发信号
	ForwardSignalChild                               //转发信号给子进程
	ForwardSignalChildGroup                          //转发信号给子进程所在进程组

	ForwardSignalInvalid //无效配置类型
)

type MonitorDaemon struct {
	signalSize        int
	forwardSignalType ForwardSignalType

	interval        time.Duration
	maxRetryTimes   int
	agentBackupPath string
	restartInterval *restartInterval
	cmdline         string
	childPid        int
Lei Li's avatar
Lei Li committed
47
48
	agentMd5        string
	cmdline0        string
“李磊”'s avatar
“李磊” committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

	isProcAbnormalCallback IsProcAbnormalCallback
}

type MonitorDaemonOpt func(*MonitorDaemon)

func WithSignalSize(size int) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		m.signalSize = size
	}
}

// WithMaxInterval 设置最大的重启间隔,设置后的重启时间间隔可以累增,防止频繁重启
func WithMaxInterval(maxInterval time.Duration) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		if maxInterval > minTimerInterval {
			m.restartInterval.setMaxInterval(maxInterval)
		}
	}
}

// WithIncrInterval 设置每次累增的时间间隔
func WithIncrInterval(incrInterval time.Duration) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		m.restartInterval.setIncr(incrInterval)
	}
}

func WithMaxRetryTimes(maxRetryTimes int) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		m.maxRetryTimes = maxRetryTimes
	}
}

func WithAgentBackupPath(path string) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		m.agentBackupPath = path
	}
}

func WithForwardSignalType(typeF ForwardSignalType) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		if typeF < ForwardSignalInvalid && typeF > NoForwardSignal {
			m.forwardSignalType = typeF
		}
	}
}

func WithTimerInterval(interval time.Duration) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		if interval >= minTimerInterval {
			m.interval = interval
		} else {
			m.interval = minTimerInterval
			l.Warnf("%v is less than minTimerInterval(%v), use %v", interval, minTimerInterval, minTimerInterval)
		}
	}
}

func WithIsProcAbnormalCallback(callback IsProcAbnormalCallback) MonitorDaemonOpt {
	return func(m *MonitorDaemon) {
		m.isProcAbnormalCallback = callback
	}
}

//实现容器内进程的监控与管理,上层调用前,需开启独立的日志功能
/*
* 1. 启动进程:
*	a. 新进程加入独立进程组,用于信号转发;
*	b. 新进程标准输入输出与当前进程一样,保证kubectl可以拿到日志
* 2. 进程监控:定时监控子进程,如果进程不存在负责启动进程。(监控子进程,最小间隔时间为1分钟)
* 3. 信号处理和转发(可配置两种转发模式):
*	a. ForwardSignalChild: 转发信号到子进程
*	b. ForwardSignalChildGroup: 转发信号到子进程组
*	c. 目前不实现转发信号到容器内所有进程,风险过高。
* 4. 回收孙子进程,防止产生僵尸进程:
*	a. 当monitor是容器内1号进程,可回收容器内所有进程。
*	b. 非容器1号进程时,只处理子进程退出信号。
* 5. 杀死子进程或子进程组
*	a. 提供回调函数注册机制,monitor定时调用此函数,根据返回值判断是否杀死子进程
*	b. 若配置了ForwardSignalChildGroup,则杀死整个子进程组
 */
func New(cmdline string, opts ...MonitorDaemonOpt) (*MonitorDaemon, error) {
	if cmdline == "" || !filepath.IsAbs(cmdline) {
		return nil, fmt.Errorf("cmdline is invalid")
	}

	m := MonitorDaemon{
		signalSize:        10,
		forwardSignalType: NoForwardSignal,

		interval:        defaultTimerInterval,
		restartInterval: newRestartInterval(),
		cmdline:         cmdline,
		childPid:        0,

		isProcAbnormalCallback: nil,
	}

	for _, opt := range opts {
		opt(&m)
	}

Lei Li's avatar
Lei Li committed
152
153
154
155
156
157
158
159
	m.cmdline0 = strings.Split(m.cmdline, " ")[0]
	md5, err := file.GetFileMD5(m.cmdline0)
	if err != nil {
		l.Errorf("calculation of agent MD5(%s) failed err:%s", m.cmdline0, err)
	} else {
		m.agentMd5 = md5
	}

“李磊”'s avatar
“李磊” committed
160
	//第一次启动进程
Lei Li's avatar
Lei Li committed
161
	err = m.startProcess()
“李磊”'s avatar
“李磊” committed
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
	if err != nil {
		l.Errorf("start child process failed %s err:%v", cmdline, err)
		return nil, err
	}

	return &m, nil
}

// 此函数是阻塞的,默认不返回。支持以ä‹信号注册和转发:syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL
func (m *MonitorDaemon) Run(signals ...os.Signal) {
	sigs := []os.Signal{
		syscall.SIGCHLD,
	}

	supportSignal := map[syscall.Signal]bool{
		syscall.SIGINT:  true,
		syscall.SIGTERM: true,
		syscall.SIGKILL: true,
	}

	for _, sig := range signals {
		if support, ok := supportSignal[sig.(syscall.Signal)]; ok && support {
			sigs = append(sigs, sig)
		}
	}

	m.runWithSignal(sigs)
}

func (m *MonitorDaemon) runWithSignal(sigs []os.Signal) {
	l.Infof("tick: %+v", m.interval)
	tick := time.NewTimer(m.interval)
	defer tick.Stop()

	var notifications = make(chan os.Signal, m.signalSize)
	signal.Notify(notifications, sigs...)

	errRetryTimes := 0
	for {
		select {
		case sig := <-notifications:
			if sig == syscall.SIGCHLD {
				// 当子进程收到SIGSTOP和SIGCONT信号时,monitor会接收到SIGCHLD,因此使用debug日志记录
				l.Debug("monitor_daemon recv sig, sig:", sig)
				for {
					killPid, err := reapChildrenSig()
					if err != nil {
						l.Errorf("monitor_daemon reapChildrenSig error:%v", err)
						break
					} else if killPid <= 0 {
						//当子进程收到SIGSTOP和SIGCONT信号时,wait4获取到killPid为0,表示子进程状态未发生改变
						break
					}

					//如果wait的pid是子进程,并且开启了转发信号开关,则进行信号发送到子进程所在的进程组
					if killPid == m.childPid && ForwardSignalChildGroup == m.forwardSignalType {
						l.Infof("monitor_daemon forwarded signal SIGCHLD to children group %d", m.childPid)
						err := syscall.Kill(-m.childPid, syscall.SIGTERM)
						if err != nil {
							l.Errorf("monitor_daemon forwarded signal SIGCHLD, send syscall.SIGTERM to children group %d, err %v", m.childPid, err)
						}
					} else {
						l.Infof("monitor_daemon not forwarded signal SIGCHLD, killPid: %d, childPid: %d", killPid, m.childPid)
					}
				}
			} else {
				l.Info("monitor_daemon recv sig, sig:", sig)
				//TODO: signal check
				if ForwardSignalChild == m.forwardSignalType {
					l.Infof("forwarded signal %d to children %d", sig, m.childPid)
					err := syscall.Kill(m.childPid, sig.(syscall.Signal))
					if err != nil {
						l.Errorf("forwarded signal %d to children %d, err %v", sig, m.childPid, err)
					}
				} else if ForwardSignalChildGroup == m.forwardSignalType {
					l.Infof("forwarded signal %d to children group %d", sig, m.childPid)
					err := syscall.Kill(-m.childPid, sig.(syscall.Signal))
					if err != nil {
						l.Errorf("forwarded signal %d to children group %d, err %v", sig, m.childPid, err)
					}
				}

				l.Infof("exit monitor_daemon pid: %d, child pid: %d, signal: %d", os.Getpid(), m.childPid, sig)
				return
			}
		case <-tick.C:
			isAlive := m.monitorProcess()

			// 时间间隔未达到允许的时间间隔
			if !isAlive && !m.restartInterval.isNeedStart() {
				l.Infof("process will restart, curInterval=%s last=%s", m.restartInterval.GetCurInterval(),
					m.restartInterval.GetLastStart().Format("2006-01-02 15:04:05"))
			}

			if !isAlive && m.restartInterval.isNeedStart() {
Lei Li's avatar
Lei Li committed
257
258

				md5, err := file.GetFileMD5(m.cmdline0)
“李磊”'s avatar
“李磊” committed
259
				if err != nil {
Lei Li's avatar
Lei Li committed
260
261
262
263
264
					l.Errorf("calculation of agent MD5(%s) failed err:%s", m.cmdline0, err)
				}
				err = m.startProcess()
				if err != nil {
					// 异常情况,直接无法启动,进行回滚操作
“李磊”'s avatar
“李磊” committed
265
266
					errRetryTimes++
					l.Errorf("monitorProcess start child process failed %s, errRetryTimes:%d, err:%v", m.cmdline, errRetryTimes, err)
Lei Li's avatar
Lei Li committed
267
268
269
270
271
272
273
274
275
276
					if md5 != m.agentMd5 && errRetryTimes > m.maxRetryTimes {
						m.rollbackForAbnormalCase()
						errRetryTimes = 0
					}
				} else {
					if md5 == m.agentMd5 {
						errRetryTimes++
						// 异常情况,启动几秒后停止,进行回滚操作
						if errRetryTimes > m.maxRetryTimes {
							m.rollbackForAbnormalCase()
“李磊”'s avatar
“李磊” committed
277
278
							errRetryTimes = 0
						}
Lei Li's avatar
Lei Li committed
279
280
					} else {
						m.agentMd5 = md5
“李磊”'s avatar
“李磊” committed
281
					}
Lei Li's avatar
Lei Li committed
282
					l.Infof("monitorProcess start child process success pid: %d, errRetryTimes:%d", m.childPid, errRetryTimes)
“李磊”'s avatar
“李磊” committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
				}
				m.restartInterval.setLastStart()
			}

			//当回调返回true时,进程异常,直接杀死子进程或子进程组
			if isAlive && m.isProcAbnormalCallback != nil {
				isAbnormal := m.isProcAbnormalCallback()
				if isAbnormal {
					m.killProcess()
				}
			}

			tick.Reset(m.interval)
		}
	}
}

func (m *MonitorDaemon) killProcess() {
	pid := m.childPid
	if pid != 0 {
		childExec := getProcExec(fmt.Sprintf("%d", pid))
		if strings.HasPrefix(m.cmdline, childExec) {
			killPid := m.childPid
			if ForwardSignalChildGroup == m.forwardSignalType {
				killPid = -m.childPid
				l.Infof("killProcess child process forwarded signal SIGTERM to group %d", m.childPid)
			}

			l.Infof("killProcess child process kill SIGTERM, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
			err := syscall.Kill(killPid, syscall.SIGTERM)
			if err != nil {
				l.Errorf("killProcess child process kill SIGTERM error: %v", err)
			}
		} else {
			l.Errorf("killProcess child process exec is not exist, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
		}
	} else {
		l.Errorf("killProcess child process pid is not exist, pid: %d, cmdline: %s", m.childPid, m.cmdline)
	}
}

func (m *MonitorDaemon) monitorProcess() bool {
	pid := m.childPid
	if pid != 0 {
		childExec := getProcExec(fmt.Sprintf("%d", pid))
		if strings.HasPrefix(m.cmdline, childExec) {
			err := syscall.Kill(pid, 0)
			if err != nil {
				l.Errorf("monitorProcess child process kill 0 error: %v, pid: %d, exec:%s, cmdline: %s", err, m.childPid, childExec, m.cmdline)
			} else {
				l.Infof("monitorProcess child process is alive, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
				return true
			}
		} else {
			l.Errorf("monitorProcess child process is not exist, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
		}
	}
	return false
}

func (m *MonitorDaemon) startProcess() error {
	cmd, err := startProc(m.cmdline)
	if err != nil {
		return err
	}

	if cmd.Process != nil {
		m.childPid = cmd.Process.Pid
	}

	return nil
}

func getProcExec(pid string) string {
	return common.ReadLink("/proc/" + pid + "/exe")
}

func startProc(cmdline string) (*exec.Cmd, error) {
	cmd := exec.Command("/bin/bash", "-c", cmdline)

	//设置子进程加入独立进程组,供父进程转发信号用
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

	// 设置子进程的标准输出为系统标准输出,保证kubectl可以拿到日志
	// 捕获子进程的标准错误,用日志库输出(既保存到持久化文件,又打印到标准输出),若捕获失败,则设置为系统标准错误
	cmd.Stdout = os.Stdout
	stderrReader, err := cmd.StderrPipe()
	if err != nil {
		l.Error("cmd create stderr pipe err:", err)
		cmd.Stderr = os.Stderr
	} else {
		go readStderr(stderrReader)
	}

	err = cmd.Start()
	if err != nil {
		return nil, err
	}

	return cmd, nil
}

// 容器1号进程时生效,用于回收儿子和孙子进程。
// 警告:
// 警告:
// 警告:不可与其他带wait操作的功能同时使用
func reapChildrenSig() (int, error) {
	pid := -1 //wait all sub process
	opts := syscall.WNOHANG
	var err error = nil

	var wstatus syscall.WaitStatus

	/*
	 *  Reap 'em, so that zombies don't accumulate.
	 *  Plants vs. Zombies!!
	 */
	pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
	for syscall.EINTR == err {
		pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
	}

	if pid > 0 {
		l.Infof("reaper cleanup: pid=%d, wstatus=%+v, msg=%v",
			pid, wstatus, waitStatusToError(wstatus))
	}

	return pid, err
} /*   End of function  reapChildren.  */

func waitStatusToError(wstatus syscall.WaitStatus) error {
	switch {
	case wstatus.Exited():
		es := wstatus.ExitStatus()
		if es == 0 {
			return nil
		}
		return errors.New(fmt.Sprint(es))
	case wstatus.Signaled():
		msg := fmt.Sprintf("signaled %v", wstatus.Signal())
		if wstatus.CoreDump() {
			msg += " (core dumped)"
		}
		return errors.New(msg)
	case wstatus.Stopped():
		msg := fmt.Sprintf("stopped %v", wstatus.StopSignal())
		trap := wstatus.TrapCause()
		if trap != -1 {
			msg += fmt.Sprintf(" (trapped %v)", trap)
		}
		return errors.New(msg)
	default:
		return fmt.Errorf("unknown WaitStatus %d", wstatus)
	}
}

func readStderr(stderrReader io.Reader) {
	scanner := bufio.NewScanner(stderrReader)
	for scanner.Scan() {
		stderrData := scanner.Text()
		if stderrData != "" {
			l.Error(stderrData)
		}
	}
	if err := scanner.Err(); err != nil {
		l.Error("read stderr err:", err)
	}
}

Lei Li's avatar
Lei Li committed
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
func (m *MonitorDaemon) rollbackForAbnormalCase() {
	if _, err := os.Stat(m.agentBackupPath); err == nil || !os.IsNotExist(err) {
		if m.cmdline0 != "" {
			err := os.Rename(m.agentBackupPath, m.cmdline0)
			if err != nil {
				l.Errorf("monitorProcess rollback agent process failed err:%v", err)
			} else {
				md5, err := file.GetFileMD5(m.cmdline0)
				if err != nil {
					l.Errorf("calculation of agent MD5(%s) failed err:%s", m.cmdline0, err)
				} else {
					m.agentMd5 = md5
				}
				l.Infof("monitorProcess rollback agent process successful md5:%s", md5)
			}
		}
	}
}