Commit b0e6d54a authored by “李磊”'s avatar “李磊”
Browse files

feat: Add some toolkits

parent dd9ff9ae
package file
import (
"bufio"
"bytes"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"unicode"
)
func GetFileMD5(file string) (string, error) {
info, err := os.Stat(file)
if err != nil {
return "", err
}
if !info.Mode().IsRegular() {
return "", fmt.Errorf("not regular file")
}
f, err := os.Open(file)
if err != nil {
return "", err
}
defer f.Close()
defer FadviseSwitch(f)
h := md5.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)), nil
}
func ReadLine(reader io.Reader, f func(string) error) error {
buf := bufio.NewReader(reader)
line, err := buf.ReadBytes('\n')
// l.Debug("line:", string(line))
for err == nil {
line = bytes.TrimRight(line, "\n")
if len(line) > 0 {
if line[len(line)-1] == 13 { // '\r'
line = bytes.TrimRight(line, "\r")
}
err = f(string(line))
if err != nil {
return err
}
}
line, err = buf.ReadBytes('\n')
}
if len(line) > 0 {
err = f(string(line))
}
if err != io.EOF {
return err
}
return nil
}
func GetConfigKeyValue(filename, commentSymbol, separator, key string) (
value string, exists bool) {
info, err := os.Stat(filename)
if err != nil {
return
}
if !info.Mode().IsRegular() {
return
}
f, err := os.Open(filename)
if err != nil {
return
}
defer f.Close()
ReadLine(f, func(line string) error {
line = strings.TrimSpace(line)
if commentSymbol != "" {
if strings.HasPrefix(strings.TrimSpace(line), commentSymbol) {
return nil
}
}
var info []string
if separator == " " {
// deal with all space character defined in unicode
for i, c := range []rune(line) {
if unicode.IsSpace(c) {
info = []string{line[:i], line[i+1:]}
}
}
} else {
info = strings.SplitN(line, separator, 2)
}
if len(info) < 2 {
return nil
}
if strings.TrimSpace(info[0]) != key {
return nil
}
exists = true
value = strings.TrimSpace(info[0])
return fmt.Errorf("break readline")
})
return
}
func SearchString(path string, regex string, commentSymbol string) []string {
deadline := time.Now().Add(30 * time.Second)
var ret []string
info, err := os.Stat(path)
if err != nil {
return ret
}
if !info.Mode().IsRegular() {
return ret
}
f, err := os.Open(path)
if err != nil {
return ret
}
defer f.Close()
defer FadviseSwitch(f)
reg, err := regexp.Compile(regex)
if err != nil {
return ret
}
ReadLine(f, func(line string) error {
if time.Now().After(deadline) {
err = fmt.Errorf("timeout reached")
return err
}
if commentSymbol != "" {
if strings.HasPrefix(strings.TrimSpace(line), commentSymbol) {
return nil
}
}
match := reg.FindStringSubmatch(line)
if len(match) > 0 {
ret = match
return fmt.Errorf("break readline")
}
return nil
})
return ret
}
func SearchStringR(path string, reg *regexp.Regexp, commentSymbol string) []string {
deadline := time.Now().Add(30 * time.Second)
var ret []string
info, err := os.Stat(path)
if err != nil {
return ret
}
if !info.Mode().IsRegular() {
return ret
}
f, err := os.Open(path)
if err != nil {
return ret
}
defer f.Close()
defer FadviseSwitch(f)
ReadLine(f, func(line string) error {
if time.Now().After(deadline) {
err = fmt.Errorf("timeout reached")
return err
}
if commentSymbol != "" {
if strings.HasPrefix(strings.TrimSpace(line), commentSymbol) {
return nil
}
}
match := reg.FindStringSubmatch(line)
if len(match) > 0 {
ret = match
return fmt.Errorf("break readline")
}
return nil
})
return ret
}
func WalkDir(dirpath, suffix string) (files []string, err error) {
files = make([]string, 0, 30)
suffix = strings.ToUpper(suffix)
err = filepath.Walk(dirpath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if strings.HasSuffix(strings.ToUpper(info.Name()), suffix) {
files = append(files, path)
}
return nil
})
return files, err
}
func GetFileSha256(file string) (string, error) {
info, err := os.Stat(file)
if err != nil {
return "", err
}
if !info.Mode().IsRegular() {
return "", fmt.Errorf("not regular file")
}
f, err := os.Open(file)
if err != nil {
return "", err
}
defer f.Close()
defer FadviseSwitch(f)
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)), nil
}
// ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) {
return ReadLinesOffsetN(filename, 0, -1)
}
// ReadLines reads contents from file and splits them by new line.
// The offset tells at which line number to start.
// The count determines the number of lines to read (starting from offset):
//
// n >= 0: at most n lines
// n < 0: whole file
func ReadLinesOffsetN(filename string, offset uint, n int) ([]string, error) {
info, err := os.Stat(filename)
if err != nil {
return []string{}, err
}
if !info.Mode().IsRegular() {
return []string{}, fmt.Errorf("not regular file")
}
f, err := os.Open(filename)
if err != nil {
return []string{""}, err
}
defer f.Close()
defer FadviseSwitch(f)
var ret []string
r := bufio.NewReader(f)
for i := 0; i < n+int(offset) || n < 0; i++ {
line, err := r.ReadString('\n')
if err != nil {
break
}
if i < int(offset) {
continue
}
ret = append(ret, strings.Trim(line, "\n"))
}
return ret, nil
}
func ReadLinesOffset(fileName string, offset int) ([]string, error) {
info, err := os.Stat(fileName)
if err != nil {
return []string{}, err
}
if !info.Mode().IsRegular() {
return []string{}, fmt.Errorf("not regular file")
}
f, err := os.Open(fileName)
if err != nil {
return []string{""}, err
}
defer f.Close()
var ret []string
r := bufio.NewReader(f)
discard, err := r.Discard(offset)
if err != nil {
return nil, fmt.Errorf("ioreader discard error:%v", err)
}
if discard != offset {
return nil, fmt.Errorf("read file lines , discard differect offset.")
}
for {
line, err := r.ReadString('\n')
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("unknown error in ReadLinesOffset")
}
break
}
ret = append(ret, strings.Trim(line, "\n"))
}
return ret, nil
}
func GetSize(fileName string) (int, error) {
fileInfo, err := os.Stat(fileName)
if err != nil {
return -1, err
}
return int(fileInfo.Size()), nil
}
// 判断所给路径文件/文件夹是否存在
func Exists(path string) bool {
_, err := os.Stat(path) // os.Stat获取文件信息
if err != nil {
if os.IsExist(err) {
return true
}
return false
}
return true
}
// IsFile 判断路径是否是文件类型
func IsFile(path string) bool {
f, err := os.Stat(path) // os.Stat获取文件信息
if err == nil && !f.IsDir() {
return true
}
return false
}
// 文件服务器etag算法
const blockSize = 16 * 1024 * 1024 // 5MB
func getFileMD5(file string) ([]byte, error) {
info, err := os.Stat(file)
if err != nil {
return nil, err
}
if !info.Mode().IsRegular() {
return nil, fmt.Errorf("not regular file")
}
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()
defer FadviseSwitch(f)
h := md5.New()
if _, err := io.Copy(h, f); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
func CalcMemoryMD5Hash(data []byte) []byte {
hash := md5.New()
hash.Write(data)
return hash.Sum(nil)
}
func calculateBlockETags(filePath string) ([]byte, int, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, 0, err
}
defer file.Close()
defer FadviseSwitch(file)
var blockETags []byte
buffer := make([]byte, blockSize)
num := 0
for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
return nil, 0, err
}
if n == 0 {
break
}
a := CalcMemoryMD5Hash(buffer[:n])
num++
blockETags = append(blockETags, a[:]...)
}
return blockETags, num, nil
}
func FSCalculateFileETag(filePath string) (string, error) {
info, err := os.Stat(filePath)
if err != nil {
return "", err
}
if info.Size() <= blockSize {
etag, err := getFileMD5(filePath)
return hex.EncodeToString(etag), err
}
blockETags, num, err := calculateBlockETags(filePath)
if err != nil {
return "", err
}
return fmt.Sprintf("%s-%d", hex.EncodeToString(CalcMemoryMD5Hash(blockETags)), num), nil
}
package file
import (
"bufio"
"bytes"
"io"
"os"
"path/filepath"
"linkfog.com/public/lib/fadvise"
"linkfog.com/public/lib/l"
"linkfog.com/public/option"
)
func FadviseSwitch(f *os.File) {
if err := fadvise.Switch(f); err != nil {
l.Error(err)
}
}
func PurgeSwitch(path string) {
if !option.Fadvise {
return
}
if err := purge(path); err != nil {
l.Error(err)
}
}
func purge(path string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
err = fadvise.Switch(f)
if err != nil {
return err
}
return nil
}
func ReadFileFadvise(name string) ([]byte, error) {
f, err := os.Open(name)
if err != nil {
return nil, err
}
defer f.Close()
defer FadviseSwitch(f)
var size int
if info, err := f.Stat(); err == nil {
size64 := info.Size()
if int64(int(size64)) == size64 {
size = int(size64)
}
}
size++ // one byte for final read at EOF
// If a file claims a small size, read at least 512 bytes.
// In particular, files in Linux's /proc claim size 0 but
// then do not work right if read in small pieces,
// so an initial read of 1 byte would not work correctly.
if size < 512 {
size = 512
}
data := make([]byte, 0, size)
for {
if len(data) >= cap(data) {
d := append(data[:cap(data)], 0)
data = d[:len(data)]
}
n, err := f.Read(data[len(data):cap(data)])
data = data[:len(data)+n]
if err != nil {
if err == io.EOF {
err = nil
}
return data, err
}
}
}
// ReadFileIntoBuffer reads an arbitrary file into a buffer.
func ReadFileIntoBufferFadvise(filename string, buf *bytes.Buffer) (int64, error) {
f, err := os.Open(filename)
if err != nil {
return -1, err
}
defer f.Close()
defer FadviseSwitch(f)
return buf.ReadFrom(f)
}
func ReadFileLineFadvise(filePath string) (line string, err error) {
if !exist(filePath) {
return line, l.WrapError("file not exist.")
}
fi, err := os.Open(filePath)
if err != nil {
return line, l.WrapError("open file err:", err, "filePath:", filePath)
}
defer fi.Close()
defer FadviseSwitch(fi)
br := bufio.NewReader(fi)
for {
a, _, c := br.ReadLine()
if c == io.EOF {
break
}
return string(a), err
}
return line, err
}
func exist(filename string) bool {
_, err := os.Stat(filename)
return err == nil || !os.IsNotExist(err)
}
// walk folder to purge file page cache
func WalkPurgeSwitch(path string) {
if !option.Fadvise {
return
}
if err := walkPurge(path); err != nil {
l.Error(err)
}
}
func walkPurge(path string) error {
return filepath.Walk(path,
func(path string, info os.FileInfo, err error) error {
if err != nil {
l.Error(err)
return nil
}
if !info.Mode().IsRegular() {
return nil
}
err = purge(path)
if err != nil {
l.Error(err)
}
return nil
})
}
package file
import (
"fmt"
"io"
"os"
"time"
"linkfog.com/public/lib/l"
)
type FileStreamer struct {
Lines chan string
path string
fd *os.File
fInfo os.FileInfo
closed bool
}
// NewStreamer transfer the given file path to a streamer,
// then file can be read by the Line channel. And FileStreamer
// will reopen path if it has been rotated
func NewStreamer(path string, seekToEnd bool) (*FileStreamer, error) {
fInfo, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("stat file err:%s", err)
}
fd, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("open file err:%s", err)
}
if seekToEnd {
offset, err := fd.Seek(0, io.SeekEnd)
if err != nil {
closeErr := fd.Close()
if closeErr != nil {
l.Warn("file close err", closeErr)
}
return nil, fmt.Errorf("seek to file end err:%s", err)
}
l.Debug("seek offset:", offset)
}
f := &FileStreamer{
path: path,
fd: fd,
fInfo: fInfo,
Lines: make(chan string, 1),
}
go f.readLines()
return f, nil
}
func (f *FileStreamer) readLines() {
// to recover send on closed channel panic when call f.Close()
defer func() {
err := recover()
if err != nil {
l.Error("readLines recover:", err)
}
}()
for !f.closed {
if f.fd != nil {
err := ReadLine(f.fd, func(line string) error {
if f.closed {
return fmt.Errorf("stream closed, break readline")
}
f.Lines <- line
return nil
})
if err != nil {
l.Error("read line err:", err)
}
FadviseSwitch(f.fd)
}
time.Sleep(5 * time.Second)
if !f.closed {
fInfo, err := os.Stat(f.path)
if err != nil {
l.Warn("stat file err:", err)
continue
}
offset, err := f.fd.Seek(0, io.SeekCurrent)
if err != nil {
l.Warn("seek current err:", err)
}
if os.SameFile(fInfo, f.fInfo) && offset <= fInfo.Size() {
continue
} else {
f.fd.Close()
f.fd, err = os.Open(f.path)
if err != nil {
l.Error("open file err:", err)
continue
}
f.fInfo = fInfo
}
}
}
f.fd.Close()
close(f.Lines)
return
}
func (f *FileStreamer) Close() {
f.closed = true
}
package file
import (
"fmt"
"testing"
)
func TestFileStreamer(t *testing.T) {
fStreamer, err := NewStreamer("/tmp/test.log", false)
if err != nil {
t.Fatal(err)
}
var line string
var ok bool
for {
line, ok = <-fStreamer.Lines
if !ok {
t.Fatal("readline err")
}
fmt.Println(line)
}
fStreamer.Close()
}
package file
import (
"testing"
"linkfog.com/public/lib/l"
)
func TestSearchString(t *testing.T) {
l.Debug(SearchString("/tmp/dirtyc0w", "madviseThread", ""))
}
package file
import (
"net/http"
"os"
)
func GetFileContentType(out *os.File) (string, error) {
// Only the first 512 bytes are used to sniff the content type.
buffer := make([]byte, 512)
_, err := out.Read(buffer)
if err != nil {
if err.Error() == "EOF" {
return "", nil
}
return "", err
}
// Use the net/http package's handy DectectContentType function. Always returns a valid
// content-type by returning "application/octet-stream" if no others seemed to match.
contentType := http.DetectContentType(buffer)
return contentType, nil
}
package file
import (
"bytes"
"errors"
"io"
"io/ioutil"
"os"
"linkfog.com/public/lib/file/config"
log "linkfog.com/public/lib/l"
"linkfog.com/public/pkg/ratelimit"
)
// errInvalidWrite means that a write returned an impossible count.
var errInvalidWrite = errors.New("invalid write result")
func WriteFile(name string, data []byte, perm os.FileMode) error {
if !config.EnableRateLimitBucket() {
log.Debug("write file no rate limit.")
err := ioutil.WriteFile(name, data, perm)
if err != nil {
return log.WrapError("no rate limit write file err:")
}
return err
}
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return log.WrapError("write file with rateLimit open file err:", err)
}
defer f.Close()
// ratelimit.NewBucket(1000*time.Nanosecond, 1*1024*1024)
rateWriter := ratelimit.Writer(f, config.RateLimitBucket())
bytesReader := bytes.NewReader(data)
for {
dateByte := make([]byte, config.RateLimitBucket().Capacity())
_, err := bytesReader.Read(dateByte)
if err != nil {
if err != io.EOF {
return log.WrapError("write file with rateLimit read data err:", err)
}
break
}
_, err = rateWriter.Write(dateByte)
if err != nil {
return log.WrapError("write file with rateLimit write data err:", err)
}
}
return err
}
package file
import (
"encoding/json"
"testing"
)
func TestWriteFile(t *testing.T) {
fPath := "./test"
data := []byte{}
aData, _ := json.Marshal("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\\n")
for i := 0; i < 10000000; i++ {
data = append(data, aData...)
}
t.Log("finished", len(data))
err := WriteFile(fPath, data, 0700)
if err != nil {
t.Error(err)
}
}
package auto
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"time"
"linkfog.com/public/lib/l"
)
func (p *auto) clean() {
p.cleanByDate()
p.cleanBySpace()
}
// 过期清理文件
func (p *auto) cleanByDate() {
fds, err := ioutil.ReadDir(p.config.PProfProfilePath)
if err != nil {
l.Error("read pprof file err", err)
return
}
if len(fds) < p.config.RetentionDays {
return
}
for _, fd := range fds {
names := strings.Split(fd.Name(), "-")
if len(names) < 3 {
continue
}
date, err := time.Parse("20060102", names[0])
if err != nil {
continue
}
if time.Now().Add(-time.Duration(p.config.RetentionDays) * 24 * time.Hour).After(date) {
err = os.Remove(filepath.Join(p.config.PProfProfilePath, fd.Name()))
if err != nil {
l.Error(err)
}
}
}
}
// 清理磁盘,直到pprof占用磁盘小于启动参数设置值
func (p *auto) cleanBySpace() {
var fileSize float64
var fileNames []string
fileNameSize := make(map[string]float64, 1)
fds, err := ioutil.ReadDir(p.config.PProfProfilePath)
if err != nil {
l.Error("read pprof file err", err)
return
}
for _, v := range fds {
names := strings.Split(v.Name(), "-")
if len(names) < 3 {
continue
}
fileSize = fileSize + float64(v.Size())
fileNames = append(fileNames, v.Name())
fileNameSize[v.Name()] = float64(v.Size())
}
sort.Strings(fileNames)
for _, v := range fileNames {
if fileSize > p.maxDiskUsage {
l.Warn("disk usage is", fmt.Sprintf("%.2fMB", float64(fileSize)/1024/1024))
err = os.Remove(filepath.Join(p.config.PProfProfilePath, v))
if err != nil {
l.Error(err)
continue
}
if value, ok := fileNameSize[v]; ok {
fileSize = fileSize - value
continue
}
}
break
}
}
package auto
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
)
func (p *auto) Dirs(t *testing.T) (int64, int) {
dirEntry, err := ioutil.ReadDir(p.config.PProfProfilePath)
if err != nil {
t.Fatal(err)
return 0, 0
}
var size int64
for _, entry := range dirEntry {
size = size + entry.Size()
}
return size, len(dirEntry)
}
func TestAutoExpirationClean(t *testing.T) {
dir := BuildFile(t)
defer os.RemoveAll(dir)
p := NewAutoPProf(Config{
PProfProfilePath: dir,
})
p.init()
size, num := p.Dirs(t)
t.Logf("size %d,num:%d", size, num)
type fields struct {
config Config
fixedQuotient int
perHourQuotient []int
detectInterval time.Duration
recordSensitivity float64
maxDiskUsage float64
periodRecordCount uint
}
tests := []struct {
name string
fields fields
want int
}{
{
name: "test",
fields: fields{
config: p.config,
fixedQuotient: p.fixedQuotient,
perHourQuotient: p.perHourQuotient,
detectInterval: p.detectInterval,
recordSensitivity: p.recordSensitivity,
maxDiskUsage: p.maxDiskUsage,
periodRecordCount: p.periodRecordCount,
},
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &auto{
config: tt.fields.config,
fixedQuotient: tt.fields.fixedQuotient,
perHourQuotient: tt.fields.perHourQuotient,
detectInterval: tt.fields.detectInterval,
recordSensitivity: tt.fields.recordSensitivity,
maxDiskUsage: tt.fields.maxDiskUsage,
periodRecordCount: tt.fields.periodRecordCount,
}
p.cleanByDate()
size, num := p.Dirs(t)
t.Logf("size %d,num:%d", size, num)
if num != tt.want {
t.Error("not the desired result,want result is", tt.want)
}
})
}
}
func BuildFile(t *testing.T) string {
dir, err := ioutil.TempDir("/usr", "testClean")
if err != nil {
t.Error(err)
return ""
}
fmt.Println("tempDir", dir)
for i := 0; i < 100; i++ {
f, err := ioutil.TempFile(dir, "20221227-000820-")
if err != nil {
t.Error(err)
return ""
}
write := bufio.NewWriter(f)
str := "aaaaa"
for j := 0; j < 209716; j++ {
write.WriteString(str)
}
write.Flush()
err = f.Close()
if err != nil {
t.Error(err)
return ""
}
}
return dir
}
func TestAutoOverDiskLimitCapacityClean(t *testing.T) {
dir := BuildFile(t)
defer os.RemoveAll(dir)
p := NewAutoPProf(Config{
PProfProfilePath: dir,
MaxDiskUsage: "50MB",
})
p.init()
size, num := p.Dirs(t)
t.Logf("size %d,num:%d", size, num)
type fields struct {
config Config
fixedQuotient int
perHourQuotient []int
detectInterval time.Duration
recordSensitivity float64
maxDiskUsage float64
periodRecordCount uint
}
tests := []struct {
name string
fields fields
want int
want1 float64
}{
{
name: "test",
fields: fields{
config: p.config,
fixedQuotient: p.fixedQuotient,
perHourQuotient: p.perHourQuotient,
detectInterval: p.detectInterval,
recordSensitivity: p.recordSensitivity,
maxDiskUsage: p.maxDiskUsage,
periodRecordCount: p.periodRecordCount,
},
want: 50,
want1: p.maxDiskUsage,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &auto{
config: tt.fields.config,
fixedQuotient: tt.fields.fixedQuotient,
perHourQuotient: tt.fields.perHourQuotient,
detectInterval: tt.fields.detectInterval,
recordSensitivity: tt.fields.recordSensitivity,
maxDiskUsage: tt.fields.maxDiskUsage,
periodRecordCount: tt.fields.periodRecordCount,
}
p.cleanBySpace()
size, num := p.Dirs(t)
t.Logf("size %d,num:%d", size, num)
if int(size) < tt.want || float64(num) < tt.want1 {
t.Error("not the desired result,err result is,want result is", size, num, tt.want, tt.want1)
}
})
}
}
package auto
import (
"path/filepath"
"time"
cgroupStats "linkfog.com/public/lib/cgroup/stats"
"linkfog.com/public/lib/common"
"linkfog.com/public/lib/l"
"linkfog.com/public/lib/unit"
"linkfog.com/public/option"
)
type Config struct {
EnableDetectMem common.EnableType // 是否采集内存异常时的信息,默认开启,非必填
PProfProfilePath string // 采集pprof数据存放目录,默认值为"/host/dosec/pprof",非必填
DetectInterval time.Duration // 周期检测cpu和内存当前占用,默认值"20s",非必填
DetectCPURateRange time.Duration // 计算cpu稳定占比的区间,默认值"5m",非必填
PProfCollectDura time.Duration // 单次采集时长,默认值为"1m",非必填
RetentionDays int // 采集文件保存天数,默认为"7",非必填
MinTriggerValue string // 最低触发值,默认值"500M",非必填
MaxRecordNumPerHour uint // 每小时最大收集次数,默认值"6",非必填
RecordSensitivity string // 收集灵敏度,默认值"100M"
MaxDiskUsage string // 最大磁盘使用,默认值"6G"
AbnormalGoroutineNum int // 异常Goroutine数量
}
type auto struct {
config Config // 采集参数设置
fixedQuotient int // 固定商值,最低的采集底线,灵敏度越低,在超过最低触发值的情况下,固定商值越低,所以采集的次数会有所增加
cgroup *cgroupStats.Stats // 采集cgroup cpu和memory
perHourQuotient []int // 每小时收集的动态商值
detectInterval time.Duration // 周期检测时间间隔
recordSensitivity float64 // 采集灵敏度
maxDiskUsage float64 // 磁盘最大占用初始化
periodRecordCount uint // 每小时计数器
}
func New() *auto {
return NewAutoPProf(Config{})
}
func NewAutoPProf(srcConf Config) *auto {
var defaultConf = Config{
EnableDetectMem: common.Enabled,
PProfProfilePath: filepath.Join(option.HostPrefix, option.NamespacePrefix, option.PProfPrefix),
DetectInterval: 20 * time.Second,
DetectCPURateRange: 5 * time.Minute,
PProfCollectDura: time.Minute,
RetentionDays: 7,
MinTriggerValue: "500M",
MaxRecordNumPerHour: 6,
RecordSensitivity: "100M",
MaxDiskUsage: "6G",
AbnormalGoroutineNum: 150,
}
finalConfig := mergeConfig(defaultConf, srcConf)
l.Info("pprof create cgroup stats")
stats := cgroupStats.NewStats(-1, finalConfig.DetectInterval, finalConfig.DetectCPURateRange)
return &auto{
config: finalConfig,
cgroup: stats,
}
}
func (p *auto) init() error {
err := p.cgroup.TryCgroupPath()
if err != nil {
l.Error("pprof cgroup stats try cgroup path err:", err)
return nil
}
l.Infof("cgroup stats mem dir:%s", p.cgroup.GetMemoryDir())
l.Infof("cgroup stats cpu dir:%s", p.cgroup.GetCPUDir())
l.Infof("cgroup stats io dir:%s", p.cgroup.GetIODir())
minMem, err := unit.ToBytes(p.config.MinTriggerValue)
if err != nil {
return l.WrapError(err)
}
recordSensitivity, err := unit.ToBytes(p.config.RecordSensitivity)
if err != nil {
return l.WrapError(err)
}
maxDiskUsage, err := unit.ToBytes(p.config.MaxDiskUsage)
if err != nil {
return l.WrapError(err)
}
if recordSensitivity < 1 || minMem <= 0 || maxDiskUsage <= 0 || p.config.MaxRecordNumPerHour <= 0 {
return l.WrapError("minTriggerValue or recordSensitivity or maxDiskUsage or maxRecordNumPerHour is abnormal", minMem, recordSensitivity, maxDiskUsage, p.config.MaxRecordNumPerHour)
}
p.maxDiskUsage = float64(maxDiskUsage) // 初始化磁盘最大容量
p.recordSensitivity = float64(recordSensitivity) // 初始化灵敏度
p.fixedQuotient = int(float64(minMem) / p.recordSensitivity) // 初始固定商值,使用限制内存除以采集灵敏度,获取一个固定的商值,用于与后续的动态商值对比,
p.detectInterval = time.Hour / time.Duration(p.config.MaxRecordNumPerHour) // 初始化采集时间间隔
p.perHourQuotient = make([]int, 6) // 初始化每小时商值存储切片
return nil
}
// notice zero value risk
func mergeConfig(dst, src Config) Config {
err := common.MergeStructExportedFields(&dst, &src)
if err != nil {
l.Error(err)
}
return dst
}
package auto
import (
"linkfog.com/public/lib/common"
"linkfog.com/public/lib/l"
)
// 异常采集,异常商值大于固定商值,且异常商值在1小时内不允许重复
func (p *auto) judgeAbnormalMemoryState(mem float64) bool {
if mem == 0 {
l.Warn("collected memory is 0")
return false
}
dynamicQuotient := int(mem / p.recordSensitivity)
if dynamicQuotient > p.fixedQuotient {
if common.InIntList(dynamicQuotient, p.perHourQuotient) {
return false
}
p.perHourQuotient = append(p.perHourQuotient, dynamicQuotient)
l.Debug("abnormal record pprof memory", mem)
return true
}
return false
}
func (p *auto) cleanPerHourQuotient() {
p.perHourQuotient = []int{}
}
func (p *auto) judgeAbnormalGoroutineNumState(gn int) bool {
if gn > p.config.AbnormalGoroutineNum {
l.Warn("goroutine num outlier", gn)
return true
}
return false
}
package auto
import (
"testing"
)
func TestAutoAbnormalRecordPprof(t *testing.T) {
type args struct {
mem float64
}
tests := []struct {
name string
p *auto
args args
want bool
setup func(p *auto)
teardown func(p *auto)
}{
{
name: "empty memory",
p: &auto{},
args: args{mem: 0},
want: false,
},
{
name: "empty fixed quotient",
p: &auto{},
args: args{mem: 10},
want: false,
},
{
name: "dynamic quotient greater than fixed quotient",
p: &auto{
recordSensitivity: 1,
fixedQuotient: 2,
perHourQuotient: []int{1},
},
args: args{mem: 3},
want: true,
},
{
name: "dynamic quotient less than fixed quotient",
p: &auto{
recordSensitivity: 1,
fixedQuotient: 2,
perHourQuotient: []int{1},
},
args: args{mem: 1},
want: false,
},
{
name: "dynamic quotient equals to fixed quotient and perHourQuotient contains dynamic quotient",
p: &auto{
recordSensitivity: 1,
fixedQuotient: 2,
perHourQuotient: []int{1, 2},
},
args: args{mem: 1},
want: false,
},
{
name: "dynamic quotient equals to fixed quotient and perHourQuotient doesn't contain dynamic quotient",
p: &auto{
recordSensitivity: 1,
fixedQuotient: 2,
perHourQuotient: []int{1},
},
args: args{mem: 2},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(tt.p)
}
test1 := tt.p.judgeAbnormalMemoryState(tt.args.mem)
if test1 != tt.want {
t.Errorf("auto.abnormalRecordPprof() wantErr %v", test1)
}
})
}
}
package auto
import (
"fmt"
"os"
"runtime"
"time"
"linkfog.com/public/lib/common"
"linkfog.com/public/lib/l"
"linkfog.com/public/lib/pprof/ps"
pruntime "linkfog.com/public/lib/pprof/runtime"
)
func (p *auto) Start() {
if err := p.init(); err != nil {
l.Error(err)
return
}
l.Info(p.config)
go p.startAutoPProf()
l.Info("auto pprof server running")
}
func (p *auto) startAutoPProf() {
if err := p.doStartAutoPProf(); err != nil {
l.Error(err)
}
}
/*
┌────┬────┬────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │
0────10───20───30───40───50──60
1.把1小时划分成6个格子,到0格子的时候是固定采集的点
2.后续每10分钟走到下一个格子后程序获取容器内存,当内存增长超过触发值,便进行1次异常采集,并把当前异常内存记录到格子中
3.但每小时,除0格子外,每个格子的异常采集内存值,不允许和其他格子存入差值在1个灵敏度之内的数据
例:最低触发值500
灵敏度100
当1号格子存入600,那么2号格子只能出入700及以上,而不能存入600以上700以下的数据
*/
func (p *auto) doStartAutoPProf() (err error) {
for {
var isRecord bool
var isMemAbnormal bool
var isGNAbnormal bool
var tag string
if p.periodRecordCount%p.config.MaxRecordNumPerHour == 0 {
isRecord = true
p.cleanPerHourQuotient()
l.Debug("start periodic record pprof")
} else if p.config.EnableDetectMem == common.Enabled {
memoryWorkingSet, err := p.cgroup.GetMemoryWorkingSet()
if err != nil {
return err
}
isMemAbnormal = p.judgeAbnormalMemoryState(memoryWorkingSet)
if isMemAbnormal {
tag = fmt.Sprintf("%.2fMB", float64(memoryWorkingSet)/1024/1024)
}
gn := runtime.NumGoroutine()
isGNAbnormal = p.judgeAbnormalGoroutineNumState(gn)
if isGNAbnormal {
tag = tag + fmt.Sprintf("%dGN", gn)
}
l.Debug("abnormal record pprof file tag", tag)
}
if isRecord || isMemAbnormal || isGNAbnormal {
err = pruntime.StartProfile(pruntime.ModeAuto, p.config.PProfProfilePath, tag, p.config.PProfCollectDura)
if err != nil {
return err
}
procs, err := ps.FindAllContainerProcs(os.Getpid())
if err != nil {
l.Warn("get all container proc err", err)
} else {
ps.Log(procs)
}
}
p.clean()
p.periodRecordCount++
time.Sleep(p.detectInterval)
}
}
package auto
import (
"testing"
"time"
)
func TestAutoDoStartAutoPProf(t *testing.T) {
p := NewAutoPProf(Config{
MinTriggerValue: "10MB",
RecordSensitivity: "1MB",
})
err := p.init()
if err != nil {
t.Error("init err", err)
return
}
type fields struct {
config Config
fixedQuotient int
perHourQuotient []int
detectInterval time.Duration
recordSensitivity float64
maxDiskUsage float64
periodRecordCount uint
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "normal",
fields: fields{
config: p.config,
fixedQuotient: p.fixedQuotient,
perHourQuotient: p.perHourQuotient,
detectInterval: 1 * time.Second,
recordSensitivity: p.recordSensitivity,
maxDiskUsage: p.maxDiskUsage,
periodRecordCount: p.periodRecordCount,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &auto{
config: tt.fields.config,
fixedQuotient: tt.fields.fixedQuotient,
perHourQuotient: tt.fields.perHourQuotient,
detectInterval: tt.fields.detectInterval,
recordSensitivity: tt.fields.recordSensitivity,
maxDiskUsage: tt.fields.maxDiskUsage,
periodRecordCount: tt.fields.periodRecordCount,
}
if err := p.doStartAutoPProf(); (err != nil) == tt.wantErr {
t.Errorf("doStartAutoPProf() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
package port
import (
"net/http"
_ "net/http/pprof"
"strconv"
"linkfog.com/public/lib/l"
)
func Start(port int) {
if port != 0 {
go func() {
port := strconv.Itoa(port)
err := http.ListenAndServe("0.0.0.0:"+port, nil)
if err != nil {
l.Error("pprof start", err)
return
}
l.Info("pprof start success")
}()
} else {
l.Debug("pprof switch is off")
}
}
package pprof
import (
"linkfog.com/public/lib/l"
"linkfog.com/public/lib/pprof/auto"
"linkfog.com/public/lib/pprof/unix"
)
func Start() {
autoPProf := auto.New()
if autoPProf != nil {
autoPProf.Start()
} else {
l.Error("auto pprof is nil")
}
unixPProf := unix.New()
unixPProf.Start()
}
package pprof
import (
"testing"
"time"
)
func TestStart(t *testing.T) {
Start()
time.Sleep(10 * time.Second)
}
package ps
import (
"io"
"os"
"strconv"
"syscall"
"linkfog.com/public/lib/l"
"linkfog.com/public/lib/util"
)
const DefaultNamespace = "/ns/ipc"
func FindAllContainerProcs(pid int) (map[int]Proc, error) {
return FindAllProcsWithNamespace(pid, DefaultNamespace)
}
func FindAllProcsWithNamespace(pid int, nsPrefix string) (map[int]Proc, error) {
var hostPrefix = ""
namespace, err := getNamespace(hostPrefix, pid, nsPrefix)
if err != nil {
return nil, l.WrapError(err)
}
procs, err := findProcsWithNamespace(hostPrefix, namespace, nsPrefix)
if err != nil {
return nil, l.WrapError(err)
}
return procs, nil
}
func getNS(path string) (uint32, error) {
var statT syscall.Stat_t
if err := syscall.Stat(path, &statT); err != nil {
return 0, err
}
return uint32(statT.Ino), nil
}
func getNamespace(hostPrefix string, pid int, nsPrefix string) (uint32, error) {
path := hostPrefix + "/proc/" + strconv.Itoa(pid) + nsPrefix
return getNS(path)
}
func getProcPath(hostPrefix string) string {
return hostPrefix + "/proc/"
}
func findProcsWithNamespace(hostPrefix string, namespace uint32, nsPrefix string) (map[int]Proc, error) {
procPath := getProcPath(hostPrefix)
d, err := os.Open(procPath)
if err != nil {
return nil, err
}
defer d.Close()
var procs = map[int]Proc{}
var names []string
var pid int64
var ns uint32
var p Proc
for {
names, err = d.Readdirnames(10)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
for _, name := range names {
// We only care if the name starts with a numeric
if name[0] < '0' || name[0] > '9' {
continue
}
// From this point forward, any errors we just ignore, because
// it might simply be that the process doesn't exist anymore.
pid, err = strconv.ParseInt(name, 10, 0)
if err != nil {
continue
}
ns, err = getNamespace(hostPrefix, int(pid), nsPrefix)
if err != nil {
if !util.IsNoSuchFileOrDirectory(err) {
l.Error(err)
}
continue
}
if ns != namespace {
continue
}
p, err = PS(int32(pid))
if err != nil {
if !util.IsNoSuchFileOrDirectory(err) {
l.Error(err)
}
continue
}
procs[int(pid)] = p
}
}
return procs, nil
}
package ps
import (
"os"
"testing"
)
func TestPS(t *testing.T) {
ps, err := FindAllContainerProcs(os.Getpid())
if err != nil {
t.Fatal(err)
}
Log(ps)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment