Commit a3280358 authored by “李磊”'s avatar “李磊”
Browse files

feat: 增加守护进程

parent b0e6d54a
package monitor_daemon
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"linkfog.com/public/lib/common"
"linkfog.com/public/lib/l"
)
const (
defaultTimerInterval time.Duration = 60 * time.Second
minTimerInterval time.Duration = 10 * time.Second
)
type IsProcAbnormalCallback func() bool
type ForwardSignalType int
const (
NoForwardSignal ForwardSignalType = iota //不转发信号
ForwardSignalChild //转发信号给子进程
ForwardSignalChildGroup //转发信号给子进程所在进程组
ForwardSignalInvalid //无效配置类型
)
type MonitorDaemon struct {
signalSize int
forwardSignalType ForwardSignalType
interval time.Duration
maxRetryTimes int
agentBackupPath string
restartInterval *restartInterval
cmdline string
childPid int
isProcAbnormalCallback IsProcAbnormalCallback
}
type MonitorDaemonOpt func(*MonitorDaemon)
func WithSignalSize(size int) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
m.signalSize = size
}
}
// WithMaxInterval 设置最大的重启间隔,设置后的重启时间间隔可以累增,防止频繁重启
func WithMaxInterval(maxInterval time.Duration) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
if maxInterval > minTimerInterval {
m.restartInterval.setMaxInterval(maxInterval)
}
}
}
// WithIncrInterval 设置每次累增的时间间隔
func WithIncrInterval(incrInterval time.Duration) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
m.restartInterval.setIncr(incrInterval)
}
}
func WithMaxRetryTimes(maxRetryTimes int) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
m.maxRetryTimes = maxRetryTimes
}
}
func WithAgentBackupPath(path string) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
m.agentBackupPath = path
}
}
func WithForwardSignalType(typeF ForwardSignalType) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
if typeF < ForwardSignalInvalid && typeF > NoForwardSignal {
m.forwardSignalType = typeF
}
}
}
func WithTimerInterval(interval time.Duration) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
if interval >= minTimerInterval {
m.interval = interval
} else {
m.interval = minTimerInterval
l.Warnf("%v is less than minTimerInterval(%v), use %v", interval, minTimerInterval, minTimerInterval)
}
}
}
func WithIsProcAbnormalCallback(callback IsProcAbnormalCallback) MonitorDaemonOpt {
return func(m *MonitorDaemon) {
m.isProcAbnormalCallback = callback
}
}
//实现容器内进程的监控与管理,上层调用前,需开启独立的日志功能
/*
* 1. 启动进程:
* a. 新进程加入独立进程组,用于信号转发;
* b. 新进程标准输入输出与当前进程一样,保证kubectl可以拿到日志
* 2. 进程监控:定时监控子进程,如果进程不存在负责启动进程。(监控子进程,最小间隔时间为1分钟)
* 3. 信号处理和转发(可配置两种转发模式):
* a. ForwardSignalChild: 转发信号到子进程
* b. ForwardSignalChildGroup: 转发信号到子进程组
* c. 目前不实现转发信号到容器内所有进程,风险过高。
* 4. 回收孙子进程,防止产生僵尸进程:
* a. 当monitor是容器内1号进程,可回收容器内所有进程。
* b. 非容器1号进程时,只处理子进程退出信号。
* 5. 杀死子进程或子进程组
* a. 提供回调函数注册机制,monitor定时调用此函数,根据返回值判断是否杀死子进程
* b. 若配置了ForwardSignalChildGroup,则杀死整个子进程组
*/
func New(cmdline string, opts ...MonitorDaemonOpt) (*MonitorDaemon, error) {
if cmdline == "" || !filepath.IsAbs(cmdline) {
return nil, fmt.Errorf("cmdline is invalid")
}
m := MonitorDaemon{
signalSize: 10,
forwardSignalType: NoForwardSignal,
interval: defaultTimerInterval,
restartInterval: newRestartInterval(),
cmdline: cmdline,
childPid: 0,
isProcAbnormalCallback: nil,
}
for _, opt := range opts {
opt(&m)
}
//第一次启动进程
err := m.startProcess()
if err != nil {
l.Errorf("start child process failed %s err:%v", cmdline, err)
return nil, err
}
return &m, nil
}
// 此函数是阻塞的,默认不返回。支持以ä‹信号注册和转发:syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL
func (m *MonitorDaemon) Run(signals ...os.Signal) {
sigs := []os.Signal{
syscall.SIGCHLD,
}
supportSignal := map[syscall.Signal]bool{
syscall.SIGINT: true,
syscall.SIGTERM: true,
syscall.SIGKILL: true,
}
for _, sig := range signals {
if support, ok := supportSignal[sig.(syscall.Signal)]; ok && support {
sigs = append(sigs, sig)
}
}
m.runWithSignal(sigs)
}
func (m *MonitorDaemon) runWithSignal(sigs []os.Signal) {
l.Infof("tick: %+v", m.interval)
tick := time.NewTimer(m.interval)
defer tick.Stop()
var notifications = make(chan os.Signal, m.signalSize)
signal.Notify(notifications, sigs...)
errRetryTimes := 0
for {
select {
case sig := <-notifications:
if sig == syscall.SIGCHLD {
// 当子进程收到SIGSTOP和SIGCONT信号时,monitor会接收到SIGCHLD,因此使用debug日志记录
l.Debug("monitor_daemon recv sig, sig:", sig)
for {
killPid, err := reapChildrenSig()
if err != nil {
l.Errorf("monitor_daemon reapChildrenSig error:%v", err)
break
} else if killPid <= 0 {
//当子进程收到SIGSTOP和SIGCONT信号时,wait4获取到killPid为0,表示子进程状态未发生改变
break
}
//如果wait的pid是子进程,并且开启了转发信号开关,则进行信号发送到子进程所在的进程组
if killPid == m.childPid && ForwardSignalChildGroup == m.forwardSignalType {
l.Infof("monitor_daemon forwarded signal SIGCHLD to children group %d", m.childPid)
err := syscall.Kill(-m.childPid, syscall.SIGTERM)
if err != nil {
l.Errorf("monitor_daemon forwarded signal SIGCHLD, send syscall.SIGTERM to children group %d, err %v", m.childPid, err)
}
} else {
l.Infof("monitor_daemon not forwarded signal SIGCHLD, killPid: %d, childPid: %d", killPid, m.childPid)
}
}
} else {
l.Info("monitor_daemon recv sig, sig:", sig)
//TODO: signal check
if ForwardSignalChild == m.forwardSignalType {
l.Infof("forwarded signal %d to children %d", sig, m.childPid)
err := syscall.Kill(m.childPid, sig.(syscall.Signal))
if err != nil {
l.Errorf("forwarded signal %d to children %d, err %v", sig, m.childPid, err)
}
} else if ForwardSignalChildGroup == m.forwardSignalType {
l.Infof("forwarded signal %d to children group %d", sig, m.childPid)
err := syscall.Kill(-m.childPid, sig.(syscall.Signal))
if err != nil {
l.Errorf("forwarded signal %d to children group %d, err %v", sig, m.childPid, err)
}
}
l.Infof("exit monitor_daemon pid: %d, child pid: %d, signal: %d", os.Getpid(), m.childPid, sig)
return
}
case <-tick.C:
isAlive := m.monitorProcess()
// 时间间隔未达到允许的时间间隔
if !isAlive && !m.restartInterval.isNeedStart() {
l.Infof("process will restart, curInterval=%s last=%s", m.restartInterval.GetCurInterval(),
m.restartInterval.GetLastStart().Format("2006-01-02 15:04:05"))
}
if !isAlive && m.restartInterval.isNeedStart() {
//忽略错误,只打印日志
err := m.startProcess()
if err != nil {
errRetryTimes++
l.Errorf("monitorProcess start child process failed %s, errRetryTimes:%d, err:%v", m.cmdline, errRetryTimes, err)
if errRetryTimes > m.maxRetryTimes {
if _, err := os.Stat(m.agentBackupPath); err == nil || !os.IsNotExist(err) {
agentName := strings.Split(m.cmdline, " ")[0]
if agentName != "" {
err := os.Rename(m.agentBackupPath, agentName)
if err != nil {
l.Errorf("monitorProcess rollback agent process failed err:%v", err)
} else {
errRetryTimes = 0
}
}
} else {
errRetryTimes = 0
}
}
} else {
errRetryTimes = 0
l.Infof("monitorProcess start child process success pid: %d", m.childPid)
}
m.restartInterval.setLastStart()
}
//当回调返回true时,进程异常,直接杀死子进程或子进程组
if isAlive && m.isProcAbnormalCallback != nil {
isAbnormal := m.isProcAbnormalCallback()
if isAbnormal {
m.killProcess()
}
}
tick.Reset(m.interval)
}
}
}
func (m *MonitorDaemon) killProcess() {
pid := m.childPid
if pid != 0 {
childExec := getProcExec(fmt.Sprintf("%d", pid))
if strings.HasPrefix(m.cmdline, childExec) {
killPid := m.childPid
if ForwardSignalChildGroup == m.forwardSignalType {
killPid = -m.childPid
l.Infof("killProcess child process forwarded signal SIGTERM to group %d", m.childPid)
}
l.Infof("killProcess child process kill SIGTERM, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
err := syscall.Kill(killPid, syscall.SIGTERM)
if err != nil {
l.Errorf("killProcess child process kill SIGTERM error: %v", err)
}
} else {
l.Errorf("killProcess child process exec is not exist, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
}
} else {
l.Errorf("killProcess child process pid is not exist, pid: %d, cmdline: %s", m.childPid, m.cmdline)
}
}
func (m *MonitorDaemon) monitorProcess() bool {
pid := m.childPid
if pid != 0 {
childExec := getProcExec(fmt.Sprintf("%d", pid))
if strings.HasPrefix(m.cmdline, childExec) {
err := syscall.Kill(pid, 0)
if err != nil {
l.Errorf("monitorProcess child process kill 0 error: %v, pid: %d, exec:%s, cmdline: %s", err, m.childPid, childExec, m.cmdline)
} else {
l.Infof("monitorProcess child process is alive, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
return true
}
} else {
l.Errorf("monitorProcess child process is not exist, pid: %d, exec:%s, cmdline: %s", m.childPid, childExec, m.cmdline)
}
}
return false
}
func (m *MonitorDaemon) startProcess() error {
cmd, err := startProc(m.cmdline)
if err != nil {
return err
}
if cmd.Process != nil {
m.childPid = cmd.Process.Pid
}
return nil
}
func getProcExec(pid string) string {
return common.ReadLink("/proc/" + pid + "/exe")
}
func startProc(cmdline string) (*exec.Cmd, error) {
cmd := exec.Command("/bin/bash", "-c", cmdline)
//设置子进程加入独立进程组,供父进程转发信号用
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// 设置子进程的标准输出为系统标准输出,保证kubectl可以拿到日志
// 捕获子进程的标准错误,用日志库输出(既保存到持久化文件,又打印到标准输出),若捕获失败,则设置为系统标准错误
cmd.Stdout = os.Stdout
stderrReader, err := cmd.StderrPipe()
if err != nil {
l.Error("cmd create stderr pipe err:", err)
cmd.Stderr = os.Stderr
} else {
go readStderr(stderrReader)
}
err = cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
// 容器1号进程时生效,用于回收儿子和孙子进程。
// 警告:
// 警告:
// 警告:不可与其他带wait操作的功能同时使用
func reapChildrenSig() (int, error) {
pid := -1 //wait all sub process
opts := syscall.WNOHANG
var err error = nil
var wstatus syscall.WaitStatus
/*
* Reap 'em, so that zombies don't accumulate.
* Plants vs. Zombies!!
*/
pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
for syscall.EINTR == err {
pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
}
if pid > 0 {
l.Infof("reaper cleanup: pid=%d, wstatus=%+v, msg=%v",
pid, wstatus, waitStatusToError(wstatus))
}
return pid, err
} /* End of function reapChildren. */
func waitStatusToError(wstatus syscall.WaitStatus) error {
switch {
case wstatus.Exited():
es := wstatus.ExitStatus()
if es == 0 {
return nil
}
return errors.New(fmt.Sprint(es))
case wstatus.Signaled():
msg := fmt.Sprintf("signaled %v", wstatus.Signal())
if wstatus.CoreDump() {
msg += " (core dumped)"
}
return errors.New(msg)
case wstatus.Stopped():
msg := fmt.Sprintf("stopped %v", wstatus.StopSignal())
trap := wstatus.TrapCause()
if trap != -1 {
msg += fmt.Sprintf(" (trapped %v)", trap)
}
return errors.New(msg)
default:
return fmt.Errorf("unknown WaitStatus %d", wstatus)
}
}
func readStderr(stderrReader io.Reader) {
scanner := bufio.NewScanner(stderrReader)
for scanner.Scan() {
stderrData := scanner.Text()
if stderrData != "" {
l.Error(stderrData)
}
}
if err := scanner.Err(); err != nil {
l.Error("read stderr err:", err)
}
}
package monitor_daemon
import (
"os"
"strings"
"syscall"
"testing"
"linkfog.com/public/lib/l"
)
/*
Run方法:
通用部分:
l.Info("======monitor_daemon start======")
cmdline := strings.Replace(strings.Join(append(os.Args, "-enable-monitor-daemon=false"), " "), "-D", "", -1)
opts := []monitor_daemon.MonitorDaemonOpt{
monitor_daemon.WithForwardSignalType(monitor_daemon.ForwardSignalChildGroup),
}
monitorDaemon, err := monitor_daemon.New(cmdline, opts...)
if err != nil {
l.Error("monitor_daemon new error", cmdline)
os.Exit(1)
}
Run示例1:
go func() {
monitorDaemon.Run()
l.Info("======monitor_daemon stop======")
}()
return nil
Run示例2:
monitorDaemon.Run(syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
l.Info("======monitor_daemon stop======")
os.Exit(1)
*/
func TestMonitorDaemonRun(t *testing.T) {
l.Info("======monitor_daemon start======")
cmdline := strings.Replace(strings.Join(append(os.Args, "-enable-monitor-daemon=false"), " "), "-D", "", -1)
opts := []MonitorDaemonOpt{
WithForwardSignalType(ForwardSignalChildGroup),
}
monitorDaemon, err := New(cmdline, opts...)
if err != nil {
l.Error("monitor_daemon new error", cmdline)
os.Exit(1)
}
//Run示例1:
go func() {
monitorDaemon.Run()
l.Info("======monitor_daemon stop======")
}()
return
}
func TestMonitorDaemonRunSignal(t *testing.T) {
l.Info("======monitor_daemon start======")
cmdline := strings.Replace(strings.Join(append(os.Args, "-enable-monitor-daemon=false"), " "), "-D", "", -1)
opts := []MonitorDaemonOpt{
WithForwardSignalType(ForwardSignalChildGroup),
}
monitorDaemon, err := New(cmdline, opts...)
if err != nil {
l.Error("monitor_daemon new error", cmdline)
os.Exit(1)
}
monitorDaemon.Run(syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
l.Info("======monitor_daemon stop======")
os.Exit(1)
}
package monitor_daemon
import (
"fmt"
"time"
)
// 用途: 当前文件用来控制进程重启的间隔时间
// 同时满足如下条件则不重启:
// 1. 设置了最大间隔时间
// 2. 距离上次重启小于累增的间隔
const (
DefaultIncrInterval = 20 * time.Second // 建议值: 每次重启累增的时间间隔
DefaultMaxInterval = 20 * time.Minute // 建议值: 重启累增时间间隔的最大值,增到这个值后平稳重启,不再累增
)
// 管理重启时间间隔
type restartInterval struct {
config *restartIntervalConfig // 重启时间间隔的配置
state *restartIntervalState // 重启时间间隔的状态
}
// 固定的配置
type restartIntervalConfig struct {
maxInterval time.Duration // 防止持续重启,加上递增逻辑,重启间隔可以递增到当前最大值,不设置则按照默认interval进行重启
incr time.Duration // 每次累增的间隔
stableDura time.Duration // 运行的时间间隔大于该值认为是稳定的,可以重置重累增的间隔,防止因为偶发问题无法及时拉起
}
// 变化的状态
type restartIntervalState struct {
lastStart time.Time // 上次重启时间
curInterval time.Duration // 累增后的时间间隔
}
func newRestartInterval() *restartInterval {
return &restartInterval{
config: &restartIntervalConfig{
maxInterval: 0,
incr: 0,
},
state: &restartIntervalState{
curInterval: 0,
lastStart: time.Now(),
},
}
}
func (r *restartInterval) setMaxInterval(maxInterval time.Duration) {
if maxInterval == 0 {
return
}
r.config.maxInterval = maxInterval
r.config.stableDura = 2 * maxInterval // 2倍只是一个大约数
}
func (r *restartInterval) setIncr(incr time.Duration) {
r.config.incr = incr
}
// setLastStart 需要设置: 1.上次启动时间; 2.下次启动要求的时间间隔累增;
func (r *restartInterval) setLastStart() {
if r.config.maxInterval == 0 {
return
}
// 设置本次启动的时间
defer func() {
r.state.lastStart = time.Now()
}()
// 已经稳定运行了一段时间后的重启,将重启间隔重置为类似第一次启动
if r.state.lastStart.Add(r.config.stableDura).Before(time.Now()) {
r.state.curInterval = r.config.incr
return
}
// 已经添加到最大值了,不再添加了
if r.state.curInterval == r.config.maxInterval {
return
}
// 递增重启间隔后如果小于最大间隔,则递增,否则采用最大间隔
if r.state.curInterval+r.config.incr < r.config.maxInterval {
r.state.curInterval = r.state.curInterval + r.config.incr
} else {
r.state.curInterval = r.config.maxInterval
}
}
func (r *restartInterval) isNeedStart() bool {
// 未启用每次重启加大时间间隔的特性
if r.config.maxInterval == 0 {
return true
}
// 达到时间要求的时间间隔,则可以重启
if r.state.lastStart.Add(r.state.curInterval).Before(time.Now()) {
return true
}
return false
}
func (r *restartInterval) GetLastStart() time.Time {
return r.state.lastStart
}
func (r *restartInterval) GetCurInterval() time.Duration {
return r.state.curInterval
}
func (r *restartInterval) String() string {
return fmt.Sprintf("config: maxInterval=%s incr=%s state: curInterval=%s lastStart:%s", r.config.maxInterval, r.config.incr, r.state.curInterval, r.state.lastStart.Format("2006-01-02 15:04:05"))
}
package monitor_daemon
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestIsNeedStart(t *testing.T) {
r := newRestartInterval()
r.config.incr = time.Second
r.setLastStart()
assert.Equal(t, r.isNeedStart(), false)
time.Sleep(time.Second)
assert.Equal(t, r.isNeedStart(), false)
r.config.maxInterval = time.Second
r.setLastStart()
assert.Equal(t, r.isNeedStart(), false)
time.Sleep(time.Second)
t.Log(r)
assert.Equal(t, r.isNeedStart(), true)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment