package pluginrpc import ( "fmt" "io" "net" "os" "sync" "time" "google.golang.org/grpc" "linkfog.com/public/lib/file" "linkfog.com/public/lib/l" pb "linkfog.com/pluginx/proto" ) // 插件grpc服务端 type PluginGrpcServer struct { listen net.Listener grpcServer *grpc.Server unixPath string pluginServer pb.PluginServer } func NewPluginGrpcServer(unixPath string, pluginServer pb.PluginServer) (*PluginGrpcServer, error) { os.Remove(unixPath) lis, err := net.Listen("unix", unixPath) if err != nil { return nil, fmt.Errorf("failed to listen %s, err: %v", unixPath, err) } grpcServer := grpc.NewServer() pb.RegisterPluginServer(grpcServer, pluginServer) go func() { grpcServer.Serve(lis) }() // 父进程退出,插件进程自动退出 go func() { initPid := os.Getpid() initPPid := os.Getppid() l.Infof("pname:%s, pid:%d, ppid:%d", os.Args[0], initPid, initPPid) for { time.Sleep(3 * time.Second) curPPid := os.Getppid() if curPPid != initPPid { l.Warnf("pname:%s, pid:%d, ppid changed, initPPid:%d, curPPid:%d, program exit", os.Args[0], initPid, initPPid, curPPid) os.Exit(1) } } }() h := &PluginGrpcServer{ listen: lis, grpcServer: grpcServer, unixPath: unixPath, pluginServer: pluginServer, } return h, nil } func (c *PluginGrpcServer) Close() { c.listen.Close() } type SendFileCallBack func(fs *pb.FileStream, filePath string) func SendFileHelper(stream pb.Plugin_SendFileServer, callback SendFileCallBack) error { f, err := os.CreateTemp("/tmp", "plugin") if err != nil { return fmt.Errorf("create temp file err: %v", err) } defer f.Close() defer file.FadviseSwitch(f) // l.Info("create temp file:", f.Name()) var totalr int64 var firstFS *pb.FileStream for { fs, err := stream.Recv() if err == io.EOF { // l.Info("stream Recv EOF") break } if err != nil { return fmt.Errorf("stream Recv err: %v", err) } // l.Info("recv file stream", fs) if firstFS == nil { firstFS = fs } totalr += int64(len(fs.Data)) n, err := f.Write(fs.Data) if err != nil { return fmt.Errorf("write file err: %v", err) } if n != len(fs.Data) { return fmt.Errorf("unexpected write file, data_size:%d, actual_write_size:%d", len(fs.Data), n) } } if totalr != firstFS.TotalSize { return fmt.Errorf("unexpected EOF, actual_recv_size:%d, expected_total_size:%d", totalr, firstFS.TotalSize) } go callback(firstFS, f.Name()) res := &pb.Res{} res.Header = firstFS.Header from := res.Header.From res.Header.From = res.Header.To res.Header.To = from res.Code = 0 res.Desc = "success" return stream.SendAndClose(res) } func ChatHelper(msg *pb.Req, stream pb.Plugin_ChatServer, res <-chan *pb.Res) error { moduleName := msg.GetHeader().GetTo() var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for { select { case <-stream.Context().Done(): l.Info("stream closed", moduleName) return case sig, ok := <-res: if !ok { l.Warn("sig chan closed", moduleName) return } l.Infof(" send message: %v", msg) if err := stream.Send(sig); err != nil { l.Error(err) return } } } }() wg.Wait() return nil }