plugin_grpc_server.go 2.61 KB
Newer Older
“李磊”'s avatar
“李磊” committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package pluginrpc

import (
	"fmt"
	"io"
	"net"
	"os"
	"time"

	"google.golang.org/grpc"

	"linkfog.com/public/lib/file"
	"linkfog.com/public/lib/l"

	pb "linkfog.com/pluginx/proto"
)

// 插件grpc服务端
type PluginGrpcServer struct {
	listen       net.Listener
	grpcServer   *grpc.Server
	unixPath     string
	pluginServer pb.PluginServer
}

func NewPluginGrpcServer(unixPath string, pluginServer pb.PluginServer) (*PluginGrpcServer, error) {
	os.Remove(unixPath)
	lis, err := net.Listen("unix", unixPath)
	if err != nil {
		return nil, fmt.Errorf("failed to listen %s, err: %v", unixPath, err)
	}

	grpcServer := grpc.NewServer()
	pb.RegisterPluginServer(grpcServer, pluginServer)
	go func() {
		grpcServer.Serve(lis)
	}()

	// 父进程退出,插件进程自动退出
	go func() {
		initPid := os.Getpid()
		initPPid := os.Getppid()
		l.Infof("pname:%s, pid:%d, ppid:%d", os.Args[0], initPid, initPPid)
		for {
			time.Sleep(3 * time.Second)
			curPPid := os.Getppid()
			if curPPid != initPPid {
				l.Warnf("pname:%s, pid:%d, ppid changed, initPPid:%d, curPPid:%d, program exit",
					os.Args[0], initPid, initPPid, curPPid)
				os.Exit(1)
			}
		}
	}()

	h := &PluginGrpcServer{
		listen:       lis,
		grpcServer:   grpcServer,
		unixPath:     unixPath,
		pluginServer: pluginServer,
	}
	return h, nil
}

func (c *PluginGrpcServer) Close() {
	c.listen.Close()
}

type SendFileCallBack func(fs *pb.FileStream, filePath string)

func SendFileHelper(stream pb.Plugin_SendFileServer, callback SendFileCallBack) error {
	f, err := os.CreateTemp("/tmp", "plugin")
	if err != nil {
		return fmt.Errorf("create temp file err: %v", err)
	}
	defer f.Close()
	defer file.FadviseSwitch(f)
	// l.Info("create temp file:", f.Name())

	var totalr int64
	var firstFS *pb.FileStream
	for {
		fs, err := stream.Recv()
		if err == io.EOF {
			// l.Info("stream Recv EOF")
			break
		}
		if err != nil {
			return fmt.Errorf("stream Recv err: %v", err)
		}
		// l.Info("recv file stream", fs)

		if firstFS == nil {
			firstFS = fs
		}
		totalr += int64(len(fs.Data))

		n, err := f.Write(fs.Data)
		if err != nil {
			return fmt.Errorf("write file err: %v", err)
		}
		if n != len(fs.Data) {
			return fmt.Errorf("unexpected write file, data_size:%d, actual_write_size:%d", len(fs.Data), n)
		}
	}

	if totalr != firstFS.TotalSize {
		return fmt.Errorf("unexpected EOF, actual_recv_size:%d, expected_total_size:%d",
			totalr, firstFS.TotalSize)
	}

	go callback(firstFS, f.Name())

	res := &pb.Res{}
	res.Header = firstFS.Header
	from := res.Header.From
	res.Header.From = res.Header.To
	res.Header.To = from
	res.Code = 0
	res.Desc = "success"
	return stream.SendAndClose(res)
}