plugin_proc.go 2.83 KB
Newer Older
“李磊”'s avatar
“李磊” committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package pluginmgr

import (
	"fmt"
	"os"
	"os/exec"
	"sync"
	"syscall"
	"time"

	gopsutilProcess "github.com/shirou/gopsutil/process"

	"linkfog.com/public/lib/file"
	"linkfog.com/public/lib/l"
)

var EnableValidatePluginMD5 = true

type pluginProcess struct {
	conf *PluginProcessConf
	cmd  *exec.Cmd
	done chan struct{} // for process exit (wait done)
	mu   *sync.Mutex   // for shutdown mutex
}

type PluginProcessConf struct {
	Name   string `json:"name"`
	Path   string `json:"path"`
	Enable bool   `json:"enable"`
	Mem    uint64 `json:"mem"`
	MD5    string `json:"md5"`
}

func newProcess(conf *PluginProcessConf) (*pluginProcess, error) {
	proc := &pluginProcess{
		conf: conf,
		done: make(chan struct{}),
		mu:   &sync.Mutex{},
	}
	err := proc.start()
	if err != nil {
		return nil, err
	}
	return proc, nil
}

func (p *pluginProcess) start() error {
	l.Infof("start plugin process:%s, path:%s", p.conf.Name, p.conf.Path)
	if !pathExists(p.conf.Path) {
		return fmt.Errorf("plugin process not exist, path:%s", p.conf.Path)
	}

	if EnableValidatePluginMD5 {
		md5, err := file.GetFileMD5(p.conf.Path)
		if err != nil {
			return err
		}
		if md5 != p.conf.MD5 {
			return fmt.Errorf("plugin process %s md5 inconsistent, actual:%s, expected:%s", p.conf.Name, md5, p.conf.MD5)
		}
	}

	cmd := exec.Command(p.conf.Path)
	cmd.Stderr = os.Stderr
	cmd.Stdout = os.Stdout
	err := cmd.Start()
	if err != nil {
		return err
	}
	p.cmd = cmd

	go func() {
		err = p.cmd.Wait()
		if err != nil {
			l.Infof("plugin process %s has exited with error:%v, code:%d", p.conf.Name, err, p.cmd.ProcessState.ExitCode())
		} else {
			l.Infof("plugin process %s has exited with code:%d", p.conf.Name, p.cmd.ProcessState.ExitCode())
		}
		close(p.done)
	}()
	return nil
}

func (p *pluginProcess) shutdown() {
	p.mu.Lock()
	defer p.mu.Unlock()

	l.Info("shutdown plugin process", p.conf.Name)
	if p.isExited() {
		l.Infof("plugin process %s has exited", p.conf.Name)
		return
	}

	select {
	case <-time.After(time.Second * 1):
		l.Infof("because of plugin process %s exit's timeout, will kill it", p.conf.Name)
		syscall.Kill(p.cmd.Process.Pid, syscall.SIGKILL)
		<-p.done // after kill, wait exit
		l.Infof("plugin process %s has been killed", p.conf.Name)
	case <-p.done: // has exited
		l.Infof("plugin process %s has been shutdown gracefully", p.conf.Name)
	}
}

func (p *pluginProcess) pid() int {
	return p.cmd.Process.Pid
}

func (p *pluginProcess) isExited() bool {
	return p.cmd.ProcessState != nil
}

func GetProcRSS(pid int) (rss uint64, err error) {
	var p *gopsutilProcess.Process
	p, err = gopsutilProcess.NewProcess(int32(pid))
	if err != nil {
		return
	}

	var memInfo *gopsutilProcess.MemoryInfoStat
	memInfo, err = p.MemoryInfo()
	if err != nil {
		return
	}

	rss = memInfo.RSS
	return
}

func pathExists(path string) bool {
		_,err := os.Stat(path)
	return err == nil || os.IsExist(err)
}