package monitor_daemon import ( "bufio" "errors" "fmt" "io" "os" "os/exec" "os/signal" "path/filepath" "strings" "syscall" "time" "linkfog.com/public/lib/common" "linkfog.com/public/lib/file" "linkfog.com/public/lib/l" ) const ( defaultTimerInterval time.Duration = 60 * time.Second minTimerInterval time.Duration = 10 * time.Second ) type IsProcAbnormalCallback func() bool type ForwardSignalType int const ( NoForwardSignal ForwardSignalType = iota //不转发信号 ForwardSignalChild //转发信号给子进程 ForwardSignalChildGroup //转发信号给子进程所在进程组 ForwardSignalInvalid //无效配置类型 ) type MonitorDaemon struct { signalSize int forwardSignalType ForwardSignalType interval time.Duration maxRetryTimes int agentBackupPath string restartInterval *restartInterval cmdline string childPid int agentMd5 string cmdline0 string isProcAbnormalCallback IsProcAbnormalCallback } type MonitorDaemonOpt func(*MonitorDaemon) func WithSignalSize(size int) MonitorDaemonOpt { return func(m *MonitorDaemon) { m.signalSize = size } } // WithMaxInterval 设置最大的重启间隔,设置后的重启时间间隔可以累增,防止频繁重启 func WithMaxInterval(maxInterval time.Duration) MonitorDaemonOpt { return func(m *MonitorDaemon) { if maxInterval > minTimerInterval { m.restartInterval.setMaxInterval(maxInterval) } } } // WithIncrInterval 设置每次累增的时间间隔 func WithIncrInterval(incrInterval time.Duration) MonitorDaemonOpt { return func(m *MonitorDaemon) { m.restartInterval.setIncr(incrInterval) } } func WithMaxRetryTimes(maxRetryTimes int) MonitorDaemonOpt { return func(m *MonitorDaemon) { m.maxRetryTimes = maxRetryTimes } } func WithAgentBackupPath(path string) MonitorDaemonOpt { return func(m *MonitorDaemon) { m.agentBackupPath = path } } func WithForwardSignalType(typeF ForwardSignalType) MonitorDaemonOpt { return func(m *MonitorDaemon) { if typeF < ForwardSignalInvalid && typeF > NoForwardSignal { m.forwardSignalType = typeF } } } func WithTimerInterval(interval time.Duration) MonitorDaemonOpt { return func(m *MonitorDaemon) { if interval >= minTimerInterval { m.interval = interval } else { m.interval = minTimerInterval l.Warnf("%v is less than minTimerInterval(%v), use %v", interval, minTimerInterval, minTimerInterval) } } } func WithIsProcAbnormalCallback(callback IsProcAbnormalCallback) MonitorDaemonOpt { return func(m *MonitorDaemon) { m.isProcAbnormalCallback = callback } } //实现容器内进程的监控与管理,上层调用前,需开启独立的日志功能 /* * 1. 启动进程: * a. 新进程加入独立进程组,用于信号转发; * b. 新进程标准输入输出与当前进程一样,保证kubectl可以拿到日志 * 2. 进程监控:定时监控子进程,如果进程不存在负责启动进程。(监控子进程,最小间隔时间为1分钟) * 3. 信号处理和转发(可配置两种转发模式): * a. ForwardSignalChild: 转发信号到子进程 * b. ForwardSignalChildGroup: 转发信号到子进程组 * c. 目前不实现转发信号到容器内所有进程,风险过高。 * 4. 回收孙子进程,防止产生僵尸进程: * a. 当monitor是容器内1号进程,可回收容器内所有进程。 * b. 非容器1号进程时,只处理子进程退出信号。 * 5. 杀死子进程或子进程组 * a. 提供回调函数注册机制,monitor定时调用此函数,根据返回值判断是否杀死子进程 * b. 若配置了ForwardSignalChildGroup,则杀死整个子进程组 */ func New(cmdline string, opts ...MonitorDaemonOpt) (*MonitorDaemon, error) { if cmdline == "" || !filepath.IsAbs(cmdline) { return nil, fmt.Errorf("cmdline is invalid") } m := MonitorDaemon{ signalSize: 10, forwardSignalType: NoForwardSignal, interval: defaultTimerInterval, restartInterval: newRestartInterval(), cmdline: cmdline, childPid: 0, isProcAbnormalCallback: nil, } for _, opt := range opts { opt(&m) } m.cmdline0 = strings.Split(m.cmdline, " ")[0] md5, err := file.GetFileMD5(m.cmdline0) if err != nil { l.Errorf("calculation of agent MD5(%s) failed err:%s", m.cmdline0, err) } else { m.agentMd5 = md5 } //第一次启动进程 err = m.startProcess() if err != nil { l.Errorf("start child process failed %s err:%v", cmdline, err) return nil, err } return &m, nil } // 此函数是阻塞的,默认不返回。支持以ä‹信号注册和转发:syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL func (m *MonitorDaemon) Run(signals ...os.Signal) { sigs := []os.Signal{ syscall.SIGCHLD, } supportSignal := map[syscall.Signal]bool{ syscall.SIGINT: true, syscall.SIGTERM: true, syscall.SIGKILL: true, } for _, sig := range signals { if support, ok := supportSignal[sig.(syscall.Signal)]; ok && support { sigs = append(sigs, sig) } } m.runWithSignal(sigs) } func (m *MonitorDaemon) runWithSignal(sigs []os.Signal) { l.Infof("tick: %+v", m.interval) tick := time.NewTimer(m.interval) defer tick.Stop() var notifications = make(chan os.Signal, m.signalSize) signal.Notify(notifications, sigs...) errRetryTimes := 0 for { select { case sig := <-notifications: if sig == syscall.SIGCHLD { // 当子进程收到SIGSTOP和SIGCONT信号时,monitor会接收到SIGCHLD,因此使用debug日志记录 l.Debug("monitor_daemon recv sig, sig:", sig) for { killPid, err := reapChildrenSig() if err != nil { l.Errorf("monitor_daemon reapChildrenSig error:%v", err) break } else if killPid <= 0 { //当子进程收到SIGSTOP和SIGCONT信号时,wait4获取到killPid为0,表示子进程状态未发生改变 break } //如果wait的pid是子进程,并且开启了转发信号开关,则进行信号发送到子进程所在的进程组 if killPid == m.childPid && ForwardSignalChildGroup == m.forwardSignalType { l.Infof("monitor_daemon forwarded signal SIGCHLD to children group %d", m.childPid) err := syscall.Kill(-m.childPid, syscall.SIGTERM) if err != nil { l.Errorf("monitor_daemon forwarded signal SIGCHLD, send syscall.SIGTERM to children group %d, err %v", m.childPid, err) } } else { l.Infof("monitor_daemon not forwarded signal SIGCHLD, killPid: %d, childPid: %d", killPid, m.childPid) } } } else { l.Info("monitor_daemon recv sig, sig:", sig) //TODO: signal check if ForwardSignalChild == m.forwardSignalType { l.Infof("forwarded signal %d to children %d", sig, m.childPid) err := syscall.Kill(m.childPid, sig.(syscall.Signal)) if err != nil { l.Errorf("forwarded signal %d to children %d, err %v", sig, m.childPid, err) } } else if ForwardSignalChildGroup == m.forwardSignalType { l.Infof("forwarded signal %d to children group %d", sig, m.childPid) err := syscall.Kill(-m.childPid, sig.(syscall.Signal)) if err != nil { l.Errorf("forwarded signal %d to children group %d, err %v", sig, m.childPid, err) } } l.Infof("exit monitor_daemon pid: %d, child pid: %d, signal: %d", os.Getpid(), m.childPid, sig) return } case <-tick.C: isAlive := m.monitorProcess() // 时间间隔未达到允许的时间间隔 if !isAlive && !m.restartInterval.isNeedStart() { l.Infof("process will restart, curInterval=%s last=%s", m.restartInterval.GetCurInterval(), m.restartInterval.GetLastStart().Format("2006-01-02 15:04:05")) } if !isAlive && m.restartInterval.isNeedStart() { md5, err := file.GetFileMD5(m.cmdline0) if err != nil { l.Errorf("calculation of agent MD5(%s) failed err:%s", m.cmdline0, err) } err = m.startProcess() if err != nil { // 异常情况,直接无法启动,进行回滚操作 errRetryTimes++ l.Errorf("monitorProcess start child process failed %s, errRetryTimes:%d, err:%v", m.cmdline, errRetryTimes, err) if md5 != m.agentMd5 && errRetryTimes > m.maxRetryTimes { m.rollbackForAbnormalCase() errRetryTimes = 0 } } else { if md5 == m.agentMd5 { errRetryTimes++ // 异常情况,启动几秒后停止,进行回滚操作 if errRetryTimes > m.maxRetryTimes { m.rollbackForAbnormalCase() errRetryTimes = 0 } } else { m.agentMd5 = md5 } l.Infof("monitorProcess start child process success pid: %d, errRetryTimes:%d", m.childPid, errRetryTimes) } m.restartInterval.setLastStart() } //当回调返回true时,进程异常,直接杀死子进程或子进程组 if isAlive && m.isProcAbnormalCallback != nil { isAbnormal := m.isProcAbnormalCallback() if isAbnormal { m.killProcess() } } tick.Reset(m.interval) } } } func (m *MonitorDaemon) killProcess() { pid := m.childPid if pid != 0 { childExec := getProcExec(fmt.Sprintf("%d", pid)) if strings.HasPrefix(m.cmdline, childExec) { killPid := m.childPid if ForwardSignalChildGroup == m.forwardSignalType { killPid = -m.childPid l.Infof("killProcess child process forwarded signal SIGTERM to group %d", m.childPid) } l.Infof("killProcess child process kill SIGTERM, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline) err := syscall.Kill(killPid, syscall.SIGTERM) if err != nil { l.Errorf("killProcess child process kill SIGTERM error: %v", err) } } else { l.Errorf("killProcess child process exec is not exist, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline) } } else { l.Errorf("killProcess child process pid is not exist, pid: %d, cmdline: %s", m.childPid, m.cmdline) } } func (m *MonitorDaemon) monitorProcess() bool { pid := m.childPid if pid != 0 { childExec := getProcExec(fmt.Sprintf("%d", pid)) if strings.HasPrefix(m.cmdline, childExec) { err := syscall.Kill(pid, 0) if err != nil { l.Errorf("monitorProcess child process kill 0 error: %v, pid: %d, exec:%s, cmdline: %s", err, m.childPid, childExec, m.cmdline) } else { l.Infof("monitorProcess child process is alive, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline) return true } } else { l.Errorf("monitorProcess child process is not exist, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline) } } return false } func (m *MonitorDaemon) startProcess() error { cmd, err := startProc(m.cmdline) if err != nil { return err } if cmd.Process != nil { m.childPid = cmd.Process.Pid } return nil } func getProcExec(pid string) string { return common.ReadLink("/proc/" + pid + "/exe") } func startProc(cmdline string) (*exec.Cmd, error) { cmd := exec.Command("/bin/bash", "-c", cmdline) //设置子进程加入独立进程组,供父进程转发信号用 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // 设置子进程的标准输出为系统标准输出,保证kubectl可以拿到日志 // 捕获子进程的标准错误,用日志库输出(既保存到持久化文件,又打印到标准输出),若捕获失败,则设置为系统标准错误 cmd.Stdout = os.Stdout stderrReader, err := cmd.StderrPipe() if err != nil { l.Error("cmd create stderr pipe err:", err) cmd.Stderr = os.Stderr } else { go readStderr(stderrReader) } err = cmd.Start() if err != nil { return nil, err } return cmd, nil } // 容器1号进程时生效,用于回收儿子和孙子进程。 // 警告: // 警告: // 警告:不可与其他带wait操作的功能同时使用 func reapChildrenSig() (int, error) { pid := -1 //wait all sub process opts := syscall.WNOHANG var err error = nil var wstatus syscall.WaitStatus /* * Reap 'em, so that zombies don't accumulate. * Plants vs. Zombies!! */ pid, err = syscall.Wait4(pid, &wstatus, opts, nil) for syscall.EINTR == err { pid, err = syscall.Wait4(pid, &wstatus, opts, nil) } if pid > 0 { l.Infof("reaper cleanup: pid=%d, wstatus=%+v, msg=%v", pid, wstatus, waitStatusToError(wstatus)) } return pid, err } /* End of function reapChildren. */ func waitStatusToError(wstatus syscall.WaitStatus) error { switch { case wstatus.Exited(): es := wstatus.ExitStatus() if es == 0 { return nil } return errors.New(fmt.Sprint(es)) case wstatus.Signaled(): msg := fmt.Sprintf("signaled %v", wstatus.Signal()) if wstatus.CoreDump() { msg += " (core dumped)" } return errors.New(msg) case wstatus.Stopped(): msg := fmt.Sprintf("stopped %v", wstatus.StopSignal()) trap := wstatus.TrapCause() if trap != -1 { msg += fmt.Sprintf(" (trapped %v)", trap) } return errors.New(msg) default: return fmt.Errorf("unknown WaitStatus %d", wstatus) } } func readStderr(stderrReader io.Reader) { scanner := bufio.NewScanner(stderrReader) for scanner.Scan() { stderrData := scanner.Text() if stderrData != "" { l.Error(stderrData) } } if err := scanner.Err(); err != nil { l.Error("read stderr err:", err) } } func (m *MonitorDaemon) rollbackForAbnormalCase() { if _, err := os.Stat(m.agentBackupPath); err == nil || !os.IsNotExist(err) { if m.cmdline0 != "" { err := os.Rename(m.agentBackupPath, m.cmdline0) if err != nil { l.Errorf("monitorProcess rollback agent process failed err:%v", err) } else { md5, err := file.GetFileMD5(m.cmdline0) if err != nil { l.Errorf("calculation of agent MD5(%s) failed err:%s", m.cmdline0, err) } else { m.agentMd5 = md5 } l.Infof("monitorProcess rollback agent process successful md5:%s", md5) } } } }