Commit 897ad942 authored by “李磊”'s avatar “李磊”
Browse files

Initial commit

parents
example/plugin_rpc/client/client
example/plugin_rpc/server/server
example/plugin_mgr/plugin_mgr
# pluginx
插件库,提供插件管理和插件grpc通信能力
package main
import (
"time"
"linkfog.com/pluginx/pluginmgr"
"linkfog.com/public/lib/l"
)
func main() {
pluginConfig := `{
"testa": {
"name": "testa",
"path": "testdata/testa.sh",
"enable": true,
"mem": 419430400,
"md5": "uninitialized"
},
"testb": {
"name": "testb",
"path": "testdata/testb.sh",
"enable": true,
"mem": 419430400,
"md5": "uninitialized"
}
}`
cfg, err := pluginmgr.LoadPluginConfigWithData([]byte(pluginConfig))
if err != nil {
l.Error("LoadPluginConfigWithData err:", err)
return
}
for k, v := range cfg {
l.Infof("plugin:%s, config:%v", k, v)
}
pluginmgr.EnableValidatePluginMD5 = false
mgr := pluginmgr.NewProcessMgr(cfg)
mgr.Start()
time.Sleep(60 * time.Second)
mgr.Stop()
}
#!/bin/bash
while true
do
echo "`date +'%F %T'` testa.sh is running"
sleep 1
done
\ No newline at end of file
#!/bin/bash
while true
do
echo "`date +'%F %T'` testb.sh is running"
sleep 1
done
\ No newline at end of file
# example
插件客户端和服务端交互实例
# 用法
分为server和client,直接编译运行即可
server
```
$ cd server
$ go build
$ ./server
2023-08-04 19:29:58 [INFO] server.go:30 recv file header: UUID:"e4b1336d-3f13-4560-8a05-1d3d780b378b" Timestamp:1691148598 From:"test-plugin" To:"agent" Func:"SendFile"
2023-08-04 19:29:58 [INFO] server.go:31 recv file name:client, purpose:testdata, size:12447406, path:/tmp/plugin2316397492
```
client
```
$ cd client
$ go build
$ ./client
2023-08-04 19:29:58 [INFO] client.go:29 Call req header: UUID:"ef30b661-dfb1-462b-ae88-c3667014c323" Timestamp:1691148598 From:"agent" To:"test-plugin" Func:"Ping"
2023-08-04 19:29:58 [INFO] client.go:30 Call req data: ping
2023-08-04 19:29:58 [INFO] client.go:38 Call res header: UUID:"ef30b661-dfb1-462b-ae88-c3667014c323" Timestamp:1691148598 From:"test-plugin" To:"agent" Func:"Ping"
2023-08-04 19:29:58 [INFO] client.go:39 Call res code:0, desc:success, data:pong
2023-08-04 19:29:58 [INFO] client.go:45 SendFile req header: UUID:"e4b1336d-3f13-4560-8a05-1d3d780b378b" Timestamp:1691148598 From:"agent" To:"test-plugin" Func:"SendFile"
2023-08-04 19:29:58 [INFO] client.go:46 SendFile req name:client, purpose:testdata
2023-08-04 19:29:58 [INFO] client.go:54 SendFile res header: UUID:"e4b1336d-3f13-4560-8a05-1d3d780b378b" Timestamp:1691148598 From:"test-plugin" To:"agent" Func:"SendFile"
2023-08-04 19:29:58 [INFO] client.go:55 SendFile res code:0, desc:success, data:
```
package main
import (
"context"
"path/filepath"
"time"
"linkfog.com/public/lib/l"
"linkfog.com/pluginx/pluginrpc"
)
func main() {
pluginConf := make(map[string]string)
pluginConf["test-plugin"] = "/tmp/unix_domain.sock"
client, err := pluginrpc.NewPluginGrpcClient(pluginConf)
if err != nil {
l.Error("NewGrpcClientHelper err:", err)
return
}
defer client.Close()
call(client)
sendFile(client)
}
func call(client *pluginrpc.PluginGrpcClient) {
req := pluginrpc.NewReq("agent", "test-plugin", "Ping", []byte("ping"))
l.Info("Call req header:", req.Header)
l.Info("Call req data:", string(req.Data))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := client.Call(ctx, req)
if err != nil {
l.Error("Call err:", err)
return
}
l.Info("Call res header:", res.Header)
l.Infof("Call res code:%d, desc:%s, data:%s", res.Code, res.Desc, string(res.Data))
}
func sendFile(client *pluginrpc.PluginGrpcClient) {
filePath := "client"
fs := pluginrpc.NewFileStream("agent", "test-plugin", "SendFile", filepath.Base(filePath), "testdata")
l.Info("SendFile req header:", fs.Header)
l.Infof("SendFile req name:%s, purpose:%s", fs.Name, fs.Purpose)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := client.SendFile(ctx, fs, filePath)
if err != nil {
l.Error("SendFile err:", err)
return
}
l.Info("SendFile res header:", res.Header)
l.Infof("SendFile res code:%d, desc:%s, data:%s", res.Code, res.Desc, string(res.Data))
}
package main
import (
"context"
"linkfog.com/pluginx/pluginrpc"
pb "linkfog.com/pluginx/proto"
"linkfog.com/public/lib/l"
)
type pluginServer struct {
pb.UnimplementedPluginServer
}
func newServer() *pluginServer {
s := &pluginServer{}
return s
}
func (s *pluginServer) Call(ctx context.Context, req *pb.Req) (*pb.Res, error) {
res := pluginrpc.NewRes(req, 0, "success", []byte("pong"))
return res, nil
}
func (s *pluginServer) SendFile(stream pb.Plugin_SendFileServer) error {
return pluginrpc.SendFileHelper(stream, SendFileHandle)
}
func SendFileHandle(fs *pb.FileStream, filePath string) {
l.Info("recv file header:", fs.Header)
l.Infof("recv file name:%s, purpose:%s, size:%d, path:%s", fs.Name, fs.Purpose, fs.TotalSize, filePath)
}
func main() {
pluginServer := newServer()
grpcServerHelper, err := pluginrpc.NewPluginGrpcServer("/tmp/unix_domain.sock", pluginServer)
if err != nil {
l.Error("NewPluginGrpcServer err:", err)
return
}
defer grpcServerHelper.Close()
select {}
}
module linkfog.com/pluginx
go 1.23.2
replace linkfog.com/public => web.lueluesay.top/git/lil/public v0.0.0-20241011060607-b0e6d54ac601
require (
github.com/golang/protobuf v1.5.4
github.com/satori/go.uuid v1.2.0
github.com/shirou/gopsutil v3.21.11+incompatible
google.golang.org/grpc v1.67.1
linkfog.com/public v0.0.0-00010101000000-000000000000
)
require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY=
github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYgY=
github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
web.lueluesay.top/git/lil/public v0.0.0-20241011060607-b0e6d54ac601 h1:hvKpgnKT1A6frRqKUgAcvfem3o2JVJVJwktwu5/ZcJs=
web.lueluesay.top/git/lil/public v0.0.0-20241011060607-b0e6d54ac601/go.mod h1:x/nRP9pMRVToI9Te1TazybP0Qlj3V+/aA2EiPQEvzsI=
package pluginmgr
import (
"fmt"
"os"
"os/exec"
"sync"
"syscall"
"time"
gopsutilProcess "github.com/shirou/gopsutil/process"
"linkfog.com/public/lib/file"
"linkfog.com/public/lib/l"
)
var EnableValidatePluginMD5 = true
type pluginProcess struct {
conf *PluginProcessConf
cmd *exec.Cmd
done chan struct{} // for process exit (wait done)
mu *sync.Mutex // for shutdown mutex
}
type PluginProcessConf struct {
Name string `json:"name"`
Path string `json:"path"`
Enable bool `json:"enable"`
Mem uint64 `json:"mem"`
MD5 string `json:"md5"`
}
func newProcess(conf *PluginProcessConf) (*pluginProcess, error) {
proc := &pluginProcess{
conf: conf,
done: make(chan struct{}),
mu: &sync.Mutex{},
}
err := proc.start()
if err != nil {
return nil, err
}
return proc, nil
}
func (p *pluginProcess) start() error {
l.Infof("start plugin process:%s, path:%s", p.conf.Name, p.conf.Path)
if !pathExists(p.conf.Path) {
return fmt.Errorf("plugin process not exist, path:%s", p.conf.Path)
}
if EnableValidatePluginMD5 {
md5, err := file.GetFileMD5(p.conf.Path)
if err != nil {
return err
}
if md5 != p.conf.MD5 {
return fmt.Errorf("plugin process %s md5 inconsistent, actual:%s, expected:%s", p.conf.Name, md5, p.conf.MD5)
}
}
cmd := exec.Command(p.conf.Path)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
return err
}
p.cmd = cmd
go func() {
err = p.cmd.Wait()
if err != nil {
l.Infof("plugin process %s has exited with error:%v, code:%d", p.conf.Name, err, p.cmd.ProcessState.ExitCode())
} else {
l.Infof("plugin process %s has exited with code:%d", p.conf.Name, p.cmd.ProcessState.ExitCode())
}
close(p.done)
}()
return nil
}
func (p *pluginProcess) shutdown() {
p.mu.Lock()
defer p.mu.Unlock()
l.Info("shutdown plugin process", p.conf.Name)
if p.isExited() {
l.Infof("plugin process %s has exited", p.conf.Name)
return
}
select {
case <-time.After(time.Second * 1):
l.Infof("because of plugin process %s exit's timeout, will kill it", p.conf.Name)
syscall.Kill(p.cmd.Process.Pid, syscall.SIGKILL)
<-p.done // after kill, wait exit
l.Infof("plugin process %s has been killed", p.conf.Name)
case <-p.done: // has exited
l.Infof("plugin process %s has been shutdown gracefully", p.conf.Name)
}
}
func (p *pluginProcess) pid() int {
return p.cmd.Process.Pid
}
func (p *pluginProcess) isExited() bool {
return p.cmd.ProcessState != nil
}
func GetProcRSS(pid int) (rss uint64, err error) {
var p *gopsutilProcess.Process
p, err = gopsutilProcess.NewProcess(int32(pid))
if err != nil {
return
}
var memInfo *gopsutilProcess.MemoryInfoStat
memInfo, err = p.MemoryInfo()
if err != nil {
return
}
rss = memInfo.RSS
return
}
func pathExists(path string) bool {
_,err := os.Stat(path)
return err == nil || os.IsExist(err)
}
package pluginmgr
import (
"sync"
"time"
"linkfog.com/public/lib/l"
)
type PluginProcessMgr struct {
PluginProcessConfMap map[string]*PluginProcessConf
pluginProcessMap map[string]*pluginProcess
mu *sync.Mutex // for PluginProcessConfMap, pluginProcessMap
done chan struct{}
doneResp chan struct{}
isRunning bool
sync.Mutex // for Start, Stop
}
func NewProcessMgr(conf map[string]*PluginProcessConf) *PluginProcessMgr {
mgr := &PluginProcessMgr{
PluginProcessConfMap: conf,
pluginProcessMap: make(map[string]*pluginProcess),
mu: &sync.Mutex{},
}
return mgr
}
func (mgr *PluginProcessMgr) Start() {
mgr.Lock()
defer mgr.Unlock()
if mgr.isRunning {
return
}
mgr.isRunning = true
mgr.done = make(chan struct{})
mgr.doneResp = make(chan struct{})
l.Info("plugin process mgr start")
go mgr.run()
}
func (mgr *PluginProcessMgr) Stop() {
mgr.Lock()
defer mgr.Unlock()
if !mgr.isRunning {
return
}
start := time.Now()
mgr.isRunning = false
close(mgr.done)
<-mgr.doneResp
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.unregisterAllProcess()
mgr.PluginProcessConfMap = make(map[string]*PluginProcessConf)
mgr.pluginProcessMap = make(map[string]*pluginProcess)
l.Infof("plugin process mgr has stopped, cost %v", time.Since(start))
}
func (mgr *PluginProcessMgr) run() {
mgr.detection()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
defer close(mgr.doneResp)
for {
select {
case <-ticker.C:
mgr.detection()
case <-mgr.done:
l.Info("plugin process mgr received end signal")
return
}
}
}
func (mgr *PluginProcessMgr) detection() {
mgr.mu.Lock()
defer mgr.mu.Unlock()
// 基于进程map和配置map,清除无用的插件进程
for name, p := range mgr.pluginProcessMap {
if _, ok := mgr.PluginProcessConfMap[name]; !ok {
l.Infof("unregister useless plugin process %s", name)
mgr.unregisterProcess(name, p)
}
}
// 基于配置map和进程map,确保进程状态与配置完全同步
for _, conf := range mgr.PluginProcessConfMap {
if !conf.Enable {
// 插件进程不可用,但进程存在,需要关闭
if p, ok := mgr.pluginProcessMap[conf.Name]; ok {
l.Infof("plugin process %s disable, unregister", conf.Name)
mgr.unregisterProcess(conf.Name, p)
}
continue
}
// 插件进程不存在,启动
p, ok := mgr.pluginProcessMap[conf.Name]
if !ok {
l.Infof("register plugin process %s", conf.Name)
mgr.registerProcess(conf)
continue
}
// 插件进程异常退出,重启
if p.isExited() {
l.Warnf("plugin process %s abnormal exited, restart", conf.Name)
mgr.registerProcess(conf)
continue
}
// 获取插件进程内存rss
rss, err := GetProcRSS(p.pid())
if err != nil {
l.Warnf("get plugin process %s memory rss err: %v", conf.Name, err)
continue
}
l.Infof("plugin process %s memory rss: %d", conf.Name, rss)
// 插件进程内存rss达到限制,重启
if rss > conf.Mem {
l.Warn("plugin process %s memory rss(%d) reach limit(%d), restart", conf.Name, rss, conf.Mem)
p.shutdown()
mgr.registerProcess(conf)
}
}
}
func (mgr *PluginProcessMgr) ReloadConf(conf map[string]*PluginProcessConf) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.PluginProcessConfMap = conf
}
func (mgr *PluginProcessMgr) registerProcess(conf *PluginProcessConf) {
p, err := newProcess(conf)
if err != nil {
l.Errorf("new plugin process %s err: %v", conf.Name, err)
return
}
mgr.pluginProcessMap[conf.Name] = p
}
func (mgr *PluginProcessMgr) unregisterProcess(name string, p *pluginProcess) {
p.shutdown()
if !p.isExited() {
l.Error("shutdown plugin process failed")
return
}
delete(mgr.pluginProcessMap, name)
}
func (mgr *PluginProcessMgr) unregisterAllProcess() {
l.Infof("shutdown all plugin process")
for name, p := range mgr.pluginProcessMap {
l.Infof("shutdown plugin process %s", name)
mgr.unregisterProcess(name, p)
}
}
package pluginmgr
import (
"encoding/json"
"fmt"
"os"
)
func LoadPluginConfigWithFile(filePath string) (map[string]*PluginProcessConf, error) {
data, err := os.ReadFile(filePath) // TODO: change to util.ReadFile
if err != nil {
return nil, err
}
return LoadPluginConfigWithData(data)
}
func LoadPluginConfigWithData(data []byte) (map[string]*PluginProcessConf, error) {
plgCfgMap := make(map[string]*PluginProcessConf)
err := json.Unmarshal(data, &plgCfgMap)
if err != nil {
return nil, fmt.Errorf("json unmarshal plugin config err: %v", err)
}
return plgCfgMap, nil
}
package pluginrpc
import (
"context"
"fmt"
"io"
"os"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"linkfog.com/public/lib/file"
pb "linkfog.com/pluginx/proto"
)
var (
FileTransferBlockSize = 102400 // unit:bytes
FileTransferTimeout = 30 // unit:minutes
)
// 插件grpc客户端
type PluginGrpcClient struct {
pluginMap map[string]*pluginClientConn
pluginConf map[string]string // key:name value:unixPath
}
type pluginClientConn struct {
conn *grpc.ClientConn
client pb.PluginClient
}
func NewPluginGrpcClient(pluginConf map[string]string) (*PluginGrpcClient, error) {
h := &PluginGrpcClient{
pluginMap: make(map[string]*pluginClientConn),
pluginConf: pluginConf,
}
for name, unixPath := range pluginConf {
if !strings.HasPrefix(unixPath, "unix:") {
unixPath = "unix:" + unixPath
}
conn, err := grpc.Dial(unixPath,
grpc.WithTransportCredentials(insecure.NewCredentials()),
// grpc.WithKeepaliveParams(keepalive.ClientParameters{
// Time: 10 * time.Second,
// Timeout: 20 * time.Second,
// }),
)
if err != nil {
h.Close()
return nil, fmt.Errorf("failed to dial %s, err:%v", unixPath, err)
}
client := pb.NewPluginClient(conn)
h.pluginMap[name] = &pluginClientConn{conn: conn, client: client}
}
return h, nil
}
func (h *PluginGrpcClient) Close() {
for _, cc := range h.pluginMap {
cc.conn.Close()
}
}
func (c *PluginGrpcClient) Call(ctx context.Context, req *pb.Req) (*pb.Res, error) {
// 查找目的插件对应的client
dstPlugin := req.Header.To
cc, ok := c.pluginMap[dstPlugin]
if !ok {
return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin)
}
return cc.client.Call(ctx, req)
}
func (c *PluginGrpcClient) SendFile(ctx context.Context, fs *pb.FileStream, filePath string) (*pb.Res, error) {
// 查找目的插件对应的client
dstPlugin := fs.Header.To
cc, ok := c.pluginMap[dstPlugin]
if !ok {
return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin)
}
info, err := os.Stat(filePath)
if err != nil {
return nil, fmt.Errorf("stat file err: %v", err)
}
if fs.TotalSize == 0 {
fs.TotalSize = info.Size()
}
if fs.TotalPart == 0 {
fs.TotalPart = calcTotalPart(info.Size(), int64(FileTransferBlockSize))
}
stream, err := cc.client.SendFile(ctx)
if err != nil {
return nil, fmt.Errorf("client.SendFile err: %v", err)
}
fs.Part = 0
err = sendFileByPart(filePath, func(part []byte) error {
fs.Data = part
fs.Part++
err = stream.Send(fs)
if err != nil {
return fmt.Errorf("stream send err: %v", err)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("sendFileByPart err: %v", err)
}
res, err := stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("stream CloseAndRecv err: %v", err)
}
return res, nil
}
func (c *PluginGrpcClient) Chat(ctx context.Context, req *pb.Req) (pb.Plugin_ChatClient, error) {
// 查找目的插件对应的client
dstPlugin := req.Header.To
cc, ok := c.pluginMap[dstPlugin]
if !ok {
return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin)
}
return cc.client.Chat(ctx, req)
}
func sendFileByPart(filePath string, send func([]byte) error) error {
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("open file err: %v", err)
}
defer f.Close()
defer file.FadviseSwitch(f)
var buf = make([]byte, FileTransferBlockSize)
var finished = false
for offset := int64(0); ; offset += int64(FileTransferBlockSize) {
readLength, err := f.ReadAt(buf, offset)
if err == io.EOF {
finished = true
} else if err != nil {
return fmt.Errorf("err occured when reading file: %s, err: %v", filePath, err)
}
if readLength == 0 {
break
}
if readLength != FileTransferBlockSize {
// trailing garbage
buf = buf[:readLength]
}
err = send(buf)
if err != nil {
return fmt.Errorf("send err: %v", err)
}
if finished {
break
}
}
return nil
}
func calcTotalPart(totalSize, partSize int64) int64 {
if totalSize%partSize == 0 {
return totalSize / partSize
}
return totalSize/partSize + 1
}
package pluginrpc
import (
"fmt"
"io"
"net"
"os"
"time"
"google.golang.org/grpc"
"linkfog.com/public/lib/file"
"linkfog.com/public/lib/l"
pb "linkfog.com/pluginx/proto"
)
// 插件grpc服务端
type PluginGrpcServer struct {
listen net.Listener
grpcServer *grpc.Server
unixPath string
pluginServer pb.PluginServer
}
func NewPluginGrpcServer(unixPath string, pluginServer pb.PluginServer) (*PluginGrpcServer, error) {
os.Remove(unixPath)
lis, err := net.Listen("unix", unixPath)
if err != nil {
return nil, fmt.Errorf("failed to listen %s, err: %v", unixPath, err)
}
grpcServer := grpc.NewServer()
pb.RegisterPluginServer(grpcServer, pluginServer)
go func() {
grpcServer.Serve(lis)
}()
// 父进程退出,插件进程自动退出
go func() {
initPid := os.Getpid()
initPPid := os.Getppid()
l.Infof("pname:%s, pid:%d, ppid:%d", os.Args[0], initPid, initPPid)
for {
time.Sleep(3 * time.Second)
curPPid := os.Getppid()
if curPPid != initPPid {
l.Warnf("pname:%s, pid:%d, ppid changed, initPPid:%d, curPPid:%d, program exit",
os.Args[0], initPid, initPPid, curPPid)
os.Exit(1)
}
}
}()
h := &PluginGrpcServer{
listen: lis,
grpcServer: grpcServer,
unixPath: unixPath,
pluginServer: pluginServer,
}
return h, nil
}
func (c *PluginGrpcServer) Close() {
c.listen.Close()
}
type SendFileCallBack func(fs *pb.FileStream, filePath string)
func SendFileHelper(stream pb.Plugin_SendFileServer, callback SendFileCallBack) error {
f, err := os.CreateTemp("/tmp", "plugin")
if err != nil {
return fmt.Errorf("create temp file err: %v", err)
}
defer f.Close()
defer file.FadviseSwitch(f)
// l.Info("create temp file:", f.Name())
var totalr int64
var firstFS *pb.FileStream
for {
fs, err := stream.Recv()
if err == io.EOF {
// l.Info("stream Recv EOF")
break
}
if err != nil {
return fmt.Errorf("stream Recv err: %v", err)
}
// l.Info("recv file stream", fs)
if firstFS == nil {
firstFS = fs
}
totalr += int64(len(fs.Data))
n, err := f.Write(fs.Data)
if err != nil {
return fmt.Errorf("write file err: %v", err)
}
if n != len(fs.Data) {
return fmt.Errorf("unexpected write file, data_size:%d, actual_write_size:%d", len(fs.Data), n)
}
}
if totalr != firstFS.TotalSize {
return fmt.Errorf("unexpected EOF, actual_recv_size:%d, expected_total_size:%d",
totalr, firstFS.TotalSize)
}
go callback(firstFS, f.Name())
res := &pb.Res{}
res.Header = firstFS.Header
from := res.Header.From
res.Header.From = res.Header.To
res.Header.To = from
res.Code = 0
res.Desc = "success"
return stream.SendAndClose(res)
}
package pluginrpc
import (
"time"
uuidgen "github.com/satori/go.uuid"
pb "linkfog.com/pluginx/proto"
)
func NewReq(from, to, function string, data []byte) *pb.Req {
req := &pb.Req{}
req.Header = &pb.Header{
UUID: uuidgen.NewV4().String(),
Timestamp: time.Now().Unix(),
From: from,
To: to,
Func: function,
}
req.Data = data
return req
}
func NewFileStream(from, to, function, name, purpose string) *pb.FileStream {
fs := &pb.FileStream{}
fs.Header = &pb.Header{
UUID: uuidgen.NewV4().String(),
Timestamp: time.Now().Unix(),
From: from,
To: to,
Func: function,
}
fs.Name = name
fs.Purpose = purpose
return fs
}
func NewRes(req *pb.Req, code int32, desc string, data []byte) *pb.Res {
res := &pb.Res{}
res.Header = &pb.Header{
UUID: req.Header.UUID,
Timestamp: time.Now().Unix(),
From: req.Header.To,
To: req.Header.From,
Func: req.Header.Func,
}
res.Code = code
res.Desc = desc
res.Data = data
return res
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: plugin.proto
package proto
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Req struct {
Header *Header `protobuf:"bytes,1,opt,name=Header,proto3" json:"Header,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Req) Reset() { *m = Req{} }
func (m *Req) String() string { return proto.CompactTextString(m) }
func (*Req) ProtoMessage() {}
func (*Req) Descriptor() ([]byte, []int) {
return fileDescriptor_22a625af4bc1cc87, []int{0}
}
func (m *Req) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Req.Unmarshal(m, b)
}
func (m *Req) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Req.Marshal(b, m, deterministic)
}
func (m *Req) XXX_Merge(src proto.Message) {
xxx_messageInfo_Req.Merge(m, src)
}
func (m *Req) XXX_Size() int {
return xxx_messageInfo_Req.Size(m)
}
func (m *Req) XXX_DiscardUnknown() {
xxx_messageInfo_Req.DiscardUnknown(m)
}
var xxx_messageInfo_Req proto.InternalMessageInfo
func (m *Req) GetHeader() *Header {
if m != nil {
return m.Header
}
return nil
}
func (m *Req) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type Res struct {
Header *Header `protobuf:"bytes,1,opt,name=Header,proto3" json:"Header,omitempty"`
Code int32 `protobuf:"varint,2,opt,name=Code,proto3" json:"Code,omitempty"`
Desc string `protobuf:"bytes,3,opt,name=Desc,proto3" json:"Desc,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=Data,proto3" json:"Data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Res) Reset() { *m = Res{} }
func (m *Res) String() string { return proto.CompactTextString(m) }
func (*Res) ProtoMessage() {}
func (*Res) Descriptor() ([]byte, []int) {
return fileDescriptor_22a625af4bc1cc87, []int{1}
}
func (m *Res) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Res.Unmarshal(m, b)
}
func (m *Res) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Res.Marshal(b, m, deterministic)
}
func (m *Res) XXX_Merge(src proto.Message) {
xxx_messageInfo_Res.Merge(m, src)
}
func (m *Res) XXX_Size() int {
return xxx_messageInfo_Res.Size(m)
}
func (m *Res) XXX_DiscardUnknown() {
xxx_messageInfo_Res.DiscardUnknown(m)
}
var xxx_messageInfo_Res proto.InternalMessageInfo
func (m *Res) GetHeader() *Header {
if m != nil {
return m.Header
}
return nil
}
func (m *Res) GetCode() int32 {
if m != nil {
return m.Code
}
return 0
}
func (m *Res) GetDesc() string {
if m != nil {
return m.Desc
}
return ""
}
func (m *Res) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type FileStream struct {
Header *Header `protobuf:"bytes,1,opt,name=Header,proto3" json:"Header,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"`
Purpose string `protobuf:"bytes,3,opt,name=Purpose,proto3" json:"Purpose,omitempty"`
TotalSize int64 `protobuf:"varint,4,opt,name=TotalSize,proto3" json:"TotalSize,omitempty"`
TotalPart int64 `protobuf:"varint,5,opt,name=TotalPart,proto3" json:"TotalPart,omitempty"`
Part int64 `protobuf:"varint,6,opt,name=Part,proto3" json:"Part,omitempty"`
Data []byte `protobuf:"bytes,7,opt,name=Data,proto3" json:"Data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FileStream) Reset() { *m = FileStream{} }
func (m *FileStream) String() string { return proto.CompactTextString(m) }
func (*FileStream) ProtoMessage() {}
func (*FileStream) Descriptor() ([]byte, []int) {
return fileDescriptor_22a625af4bc1cc87, []int{2}
}
func (m *FileStream) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FileStream.Unmarshal(m, b)
}
func (m *FileStream) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FileStream.Marshal(b, m, deterministic)
}
func (m *FileStream) XXX_Merge(src proto.Message) {
xxx_messageInfo_FileStream.Merge(m, src)
}
func (m *FileStream) XXX_Size() int {
return xxx_messageInfo_FileStream.Size(m)
}
func (m *FileStream) XXX_DiscardUnknown() {
xxx_messageInfo_FileStream.DiscardUnknown(m)
}
var xxx_messageInfo_FileStream proto.InternalMessageInfo
func (m *FileStream) GetHeader() *Header {
if m != nil {
return m.Header
}
return nil
}
func (m *FileStream) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *FileStream) GetPurpose() string {
if m != nil {
return m.Purpose
}
return ""
}
func (m *FileStream) GetTotalSize() int64 {
if m != nil {
return m.TotalSize
}
return 0
}
func (m *FileStream) GetTotalPart() int64 {
if m != nil {
return m.TotalPart
}
return 0
}
func (m *FileStream) GetPart() int64 {
if m != nil {
return m.Part
}
return 0
}
func (m *FileStream) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type Header struct {
UUID string `protobuf:"bytes,1,opt,name=UUID,proto3" json:"UUID,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"`
From string `protobuf:"bytes,3,opt,name=From,proto3" json:"From,omitempty"`
To string `protobuf:"bytes,4,opt,name=To,proto3" json:"To,omitempty"`
Func string `protobuf:"bytes,5,opt,name=Func,proto3" json:"Func,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Header) Reset() { *m = Header{} }
func (m *Header) String() string { return proto.CompactTextString(m) }
func (*Header) ProtoMessage() {}
func (*Header) Descriptor() ([]byte, []int) {
return fileDescriptor_22a625af4bc1cc87, []int{3}
}
func (m *Header) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Header.Unmarshal(m, b)
}
func (m *Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Header.Marshal(b, m, deterministic)
}
func (m *Header) XXX_Merge(src proto.Message) {
xxx_messageInfo_Header.Merge(m, src)
}
func (m *Header) XXX_Size() int {
return xxx_messageInfo_Header.Size(m)
}
func (m *Header) XXX_DiscardUnknown() {
xxx_messageInfo_Header.DiscardUnknown(m)
}
var xxx_messageInfo_Header proto.InternalMessageInfo
func (m *Header) GetUUID() string {
if m != nil {
return m.UUID
}
return ""
}
func (m *Header) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *Header) GetFrom() string {
if m != nil {
return m.From
}
return ""
}
func (m *Header) GetTo() string {
if m != nil {
return m.To
}
return ""
}
func (m *Header) GetFunc() string {
if m != nil {
return m.Func
}
return ""
}
func init() {
proto.RegisterType((*Req)(nil), "proto.Req")
proto.RegisterType((*Res)(nil), "proto.Res")
proto.RegisterType((*FileStream)(nil), "proto.FileStream")
proto.RegisterType((*Header)(nil), "proto.Header")
}
func init() { proto.RegisterFile("plugin.proto", fileDescriptor_22a625af4bc1cc87) }
var fileDescriptor_22a625af4bc1cc87 = []byte{
// 326 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x51, 0xc1, 0x4a, 0xc3, 0x40,
0x10, 0x75, 0xdb, 0x34, 0x35, 0x63, 0x15, 0x9c, 0x53, 0x10, 0x0f, 0x21, 0x20, 0xe4, 0x62, 0x91,
0xfa, 0x03, 0x42, 0x4b, 0xd1, 0x8b, 0x94, 0x6d, 0xfb, 0x01, 0x6b, 0x3b, 0xd8, 0x40, 0x92, 0x4d,
0x37, 0x9b, 0x8b, 0xe0, 0xe7, 0xf9, 0x5f, 0xb2, 0xd3, 0xa4, 0x8d, 0x07, 0x0f, 0x3d, 0xe5, 0xcd,
0x7b, 0x93, 0xf7, 0x66, 0x66, 0x61, 0x54, 0x66, 0xf5, 0x67, 0x5a, 0x8c, 0x4b, 0xa3, 0xad, 0xc6,
0x01, 0x7f, 0xe2, 0x17, 0xe8, 0x4b, 0xda, 0xe3, 0x03, 0xf8, 0xaf, 0xa4, 0xb6, 0x64, 0x42, 0x11,
0x89, 0xe4, 0x6a, 0x72, 0x7d, 0xe8, 0x1a, 0x1f, 0x48, 0xd9, 0x88, 0x88, 0xe0, 0xcd, 0x94, 0x55,
0x61, 0x2f, 0x12, 0xc9, 0x48, 0x32, 0x8e, 0x77, 0xce, 0xa1, 0x3a, 0xc3, 0x61, 0xaa, 0xb7, 0xc4,
0x0e, 0x03, 0xc9, 0x98, 0x5d, 0xa9, 0xda, 0x84, 0xfd, 0x48, 0x24, 0x81, 0x64, 0x7c, 0x4c, 0xf2,
0x3a, 0x49, 0x3f, 0x02, 0x60, 0x9e, 0x66, 0xb4, 0xb4, 0x86, 0x54, 0x7e, 0x46, 0xe2, 0xbb, 0xca,
0x0f, 0x89, 0x81, 0x64, 0x8c, 0x21, 0x0c, 0x17, 0xb5, 0x29, 0x75, 0x45, 0x4d, 0x68, 0x5b, 0xe2,
0x3d, 0x04, 0x2b, 0x6d, 0x55, 0xb6, 0x4c, 0xbf, 0x88, 0xc3, 0xfb, 0xf2, 0x44, 0x1c, 0xd5, 0x85,
0x32, 0x36, 0x1c, 0x74, 0x54, 0x47, 0xb8, 0x24, 0x16, 0x7c, 0x16, 0xbc, 0x96, 0xe3, 0x3d, 0x86,
0x9d, 0x3d, 0x0c, 0x74, 0x66, 0x5b, 0xaf, 0xdf, 0x66, 0xbc, 0x40, 0x20, 0x19, 0x73, 0x46, 0x9a,
0x53, 0x65, 0x55, 0x5e, 0xf2, 0xd0, 0x2e, 0xa3, 0x25, 0xdc, 0x1f, 0x73, 0xa3, 0xf3, 0xf6, 0x56,
0x0e, 0xe3, 0x0d, 0xf4, 0x56, 0x9a, 0x87, 0x0d, 0x64, 0x6f, 0xa5, 0xb9, 0xa7, 0x2e, 0x36, 0x3c,
0xa0, 0xeb, 0xa9, 0x8b, 0xcd, 0xe4, 0x1b, 0xfc, 0x05, 0x3f, 0x3f, 0x46, 0xe0, 0x4d, 0x55, 0x96,
0x21, 0x34, 0xe7, 0x92, 0xb4, 0xbf, 0x3b, 0xe1, 0x2a, 0xbe, 0xc0, 0x47, 0xb8, 0x5c, 0x52, 0xb1,
0x75, 0xa7, 0xc6, 0xdb, 0x46, 0x39, 0xdd, 0xfd, 0x6f, 0x73, 0x22, 0x30, 0x06, 0x6f, 0xba, 0x53,
0xf6, 0x7f, 0xc3, 0x27, 0xf1, 0xe1, 0x73, 0xf9, 0xfc, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x6c,
0xc9, 0x75, 0x84, 0x02, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// PluginClient is the client API for Plugin service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type PluginClient interface {
Call(ctx context.Context, in *Req, opts ...grpc.CallOption) (*Res, error)
SendFile(ctx context.Context, opts ...grpc.CallOption) (Plugin_SendFileClient, error)
Chat(ctx context.Context, in *Req, opts ...grpc.CallOption) (Plugin_ChatClient, error)
}
type pluginClient struct {
cc *grpc.ClientConn
}
func NewPluginClient(cc *grpc.ClientConn) PluginClient {
return &pluginClient{cc}
}
func (c *pluginClient) Call(ctx context.Context, in *Req, opts ...grpc.CallOption) (*Res, error) {
out := new(Res)
err := c.cc.Invoke(ctx, "/proto.Plugin/Call", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *pluginClient) SendFile(ctx context.Context, opts ...grpc.CallOption) (Plugin_SendFileClient, error) {
stream, err := c.cc.NewStream(ctx, &_Plugin_serviceDesc.Streams[0], "/proto.Plugin/SendFile", opts...)
if err != nil {
return nil, err
}
x := &pluginSendFileClient{stream}
return x, nil
}
type Plugin_SendFileClient interface {
Send(*FileStream) error
CloseAndRecv() (*Res, error)
grpc.ClientStream
}
type pluginSendFileClient struct {
grpc.ClientStream
}
func (x *pluginSendFileClient) Send(m *FileStream) error {
return x.ClientStream.SendMsg(m)
}
func (x *pluginSendFileClient) CloseAndRecv() (*Res, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(Res)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *pluginClient) Chat(ctx context.Context, in *Req, opts ...grpc.CallOption) (Plugin_ChatClient, error) {
stream, err := c.cc.NewStream(ctx, &_Plugin_serviceDesc.Streams[1], "/proto.Plugin/Chat", opts...)
if err != nil {
return nil, err
}
x := &pluginChatClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Plugin_ChatClient interface {
Recv() (*Res, error)
grpc.ClientStream
}
type pluginChatClient struct {
grpc.ClientStream
}
func (x *pluginChatClient) Recv() (*Res, error) {
m := new(Res)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// PluginServer is the server API for Plugin service.
type PluginServer interface {
Call(context.Context, *Req) (*Res, error)
SendFile(Plugin_SendFileServer) error
Chat(*Req, Plugin_ChatServer) error
}
// UnimplementedPluginServer can be embedded to have forward compatible implementations.
type UnimplementedPluginServer struct {
}
func (*UnimplementedPluginServer) Call(ctx context.Context, req *Req) (*Res, error) {
return nil, status.Errorf(codes.Unimplemented, "method Call not implemented")
}
func (*UnimplementedPluginServer) SendFile(srv Plugin_SendFileServer) error {
return status.Errorf(codes.Unimplemented, "method SendFile not implemented")
}
func (*UnimplementedPluginServer) Chat(req *Req, srv Plugin_ChatServer) error {
return status.Errorf(codes.Unimplemented, "method Chat not implemented")
}
func RegisterPluginServer(s *grpc.Server, srv PluginServer) {
s.RegisterService(&_Plugin_serviceDesc, srv)
}
func _Plugin_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Req)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PluginServer).Call(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Plugin/Call",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PluginServer).Call(ctx, req.(*Req))
}
return interceptor(ctx, in, info, handler)
}
func _Plugin_SendFile_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(PluginServer).SendFile(&pluginSendFileServer{stream})
}
type Plugin_SendFileServer interface {
SendAndClose(*Res) error
Recv() (*FileStream, error)
grpc.ServerStream
}
type pluginSendFileServer struct {
grpc.ServerStream
}
func (x *pluginSendFileServer) SendAndClose(m *Res) error {
return x.ServerStream.SendMsg(m)
}
func (x *pluginSendFileServer) Recv() (*FileStream, error) {
m := new(FileStream)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Plugin_Chat_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Req)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(PluginServer).Chat(m, &pluginChatServer{stream})
}
type Plugin_ChatServer interface {
Send(*Res) error
grpc.ServerStream
}
type pluginChatServer struct {
grpc.ServerStream
}
func (x *pluginChatServer) Send(m *Res) error {
return x.ServerStream.SendMsg(m)
}
var _Plugin_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.Plugin",
HandlerType: (*PluginServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Call",
Handler: _Plugin_Call_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "SendFile",
Handler: _Plugin_SendFile_Handler,
ClientStreams: true,
},
{
StreamName: "Chat",
Handler: _Plugin_Chat_Handler,
ServerStreams: true,
},
},
Metadata: "plugin.proto",
}
syntax = "proto3";
package proto;
service Plugin {
rpc Call(Req) returns (Res) {}
rpc SendFile(stream FileStream) returns (Res) {}
rpc Chat(Req) returns (stream Res) {}
}
message Req {
Header Header = 1;
bytes Data = 2;
}
message Res {
Header Header = 1;
int32 Code = 2;
string Desc = 3;
bytes Data = 4;
}
message FileStream {
Header Header = 1;
string Name = 2;
string Purpose = 3;
int64 TotalSize = 4;
int64 TotalPart = 5;
int64 Part = 6;
bytes Data = 7;
}
message Header {
string UUID = 1;
int64 Timestamp = 2;
string From = 3;
string To = 4;
string Func = 5;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment