Commit d16eda57 authored by Lei Li's avatar Lei Li
Browse files

feat: 增加流式RPC测试代码

parent 897ad942
example/plugin_rpc/client/client
example/plugin_rpc/server/server
example/plugin_mgr/plugin_mgr
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="Go" enabled="true" />
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>
\ No newline at end of file
...@@ -22,6 +22,7 @@ func main() { ...@@ -22,6 +22,7 @@ func main() {
call(client) call(client)
sendFile(client) sendFile(client)
chat(client)
} }
func call(client *pluginrpc.PluginGrpcClient) { func call(client *pluginrpc.PluginGrpcClient) {
...@@ -54,3 +55,20 @@ func sendFile(client *pluginrpc.PluginGrpcClient) { ...@@ -54,3 +55,20 @@ func sendFile(client *pluginrpc.PluginGrpcClient) {
l.Info("SendFile res header:", res.Header) l.Info("SendFile res header:", res.Header)
l.Infof("SendFile res code:%d, desc:%s, data:%s", res.Code, res.Desc, string(res.Data)) l.Infof("SendFile res code:%d, desc:%s, data:%s", res.Code, res.Desc, string(res.Data))
} }
func chat(client *pluginrpc.PluginGrpcClient) {
req := pluginrpc.NewReq("agent", "test-plugin", "chat", []byte("chat"))
chatClient, err := client.Chat(context.Background(), req)
if err != nil {
l.Error("ChatClient err:", err)
return
}
for {
res, err := chatClient.Recv()
if err != nil {
l.Error("ChatClient receive err:", err)
return
}
l.Infof("receive message: %v", res)
}
}
...@@ -2,6 +2,7 @@ package main ...@@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"time"
"linkfog.com/pluginx/pluginrpc" "linkfog.com/pluginx/pluginrpc"
pb "linkfog.com/pluginx/proto" pb "linkfog.com/pluginx/proto"
...@@ -31,6 +32,16 @@ func SendFileHandle(fs *pb.FileStream, filePath string) { ...@@ -31,6 +32,16 @@ func SendFileHandle(fs *pb.FileStream, filePath string) {
l.Infof("recv file name:%s, purpose:%s, size:%d, path:%s", fs.Name, fs.Purpose, fs.TotalSize, filePath) l.Infof("recv file name:%s, purpose:%s, size:%d, path:%s", fs.Name, fs.Purpose, fs.TotalSize, filePath)
} }
func (s *pluginServer) Chat(req *pb.Req, stream pb.Plugin_ChatServer) error {
moduleName := req.GetHeader().GetTo()
l.Infof("RPC:start chat with %s", moduleName)
ch = make(chan *pb.Res, 50)
return pluginrpc.ChatHelper(req, stream, ch)
}
var ch chan *pb.Res
func main() { func main() {
pluginServer := newServer() pluginServer := newServer()
grpcServerHelper, err := pluginrpc.NewPluginGrpcServer("/tmp/unix_domain.sock", pluginServer) grpcServerHelper, err := pluginrpc.NewPluginGrpcServer("/tmp/unix_domain.sock", pluginServer)
...@@ -40,5 +51,23 @@ func main() { ...@@ -40,5 +51,23 @@ func main() {
} }
defer grpcServerHelper.Close() defer grpcServerHelper.Close()
time.Sleep(10 * time.Second)
go func() {
msg := &pb.Res{Header: &pb.Header{
UUID: "123456",
Timestamp: time.Now().Unix(),
From: "test-plugin",
To: "agent",
Func: "",
}, Code: 0, Desc: "chat info",
}
for {
msg.Header.Timestamp = time.Now().Unix()
msg.Code += 1
ch <- msg
time.Sleep(5 * time.Second)
}
}()
select {} select {}
} }
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"io" "io"
"net" "net"
"os" "os"
"sync"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -119,3 +120,33 @@ func SendFileHelper(stream pb.Plugin_SendFileServer, callback SendFileCallBack) ...@@ -119,3 +120,33 @@ func SendFileHelper(stream pb.Plugin_SendFileServer, callback SendFileCallBack)
res.Desc = "success" res.Desc = "success"
return stream.SendAndClose(res) return stream.SendAndClose(res)
} }
func ChatHelper(msg *pb.Req, stream pb.Plugin_ChatServer, res <-chan *pb.Res) error {
moduleName := msg.GetHeader().GetTo()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stream.Context().Done():
l.Info("stream closed", moduleName)
return
case sig, ok := <-res:
if !ok {
l.Warn("sig chan closed", moduleName)
return
}
l.Infof(" send message: %v", msg)
if err := stream.Send(sig); err != nil {
l.Error(err)
return
}
}
}
}()
wg.Wait()
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment