Commit 3651ccc2 authored by Lei Li's avatar Lei Li
Browse files

feat: 支持增量修改

parent a6e286cc
...@@ -138,6 +138,12 @@ func (mgr *PluginProcessMgr) ReloadConf(conf map[string]*PluginProcessConf) { ...@@ -138,6 +138,12 @@ func (mgr *PluginProcessMgr) ReloadConf(conf map[string]*PluginProcessConf) {
mgr.PluginProcessConfMap = conf mgr.PluginProcessConfMap = conf
} }
func (mgr *PluginProcessMgr) ChangeSinglePluginConf(conf *PluginProcessConf) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.PluginProcessConfMap[conf.Name] = conf
}
func (mgr *PluginProcessMgr) registerProcess(conf *PluginProcessConf) { func (mgr *PluginProcessMgr) registerProcess(conf *PluginProcessConf) {
p, err := newProcess(conf) p, err := newProcess(conf)
if err != nil { if err != nil {
......
...@@ -31,6 +31,15 @@ type pluginClientConn struct { ...@@ -31,6 +31,15 @@ type pluginClientConn struct {
client pb.PluginClient client pb.PluginClient
} }
func New() (*PluginGrpcClient, error) {
h := &PluginGrpcClient{
pluginMap: make(map[string]*pluginClientConn),
pluginConf: make(map[string]string),
}
return h, nil
}
func NewPluginGrpcClient(pluginConf map[string]string) (*PluginGrpcClient, error) { func NewPluginGrpcClient(pluginConf map[string]string) (*PluginGrpcClient, error) {
h := &PluginGrpcClient{ h := &PluginGrpcClient{
pluginMap: make(map[string]*pluginClientConn), pluginMap: make(map[string]*pluginClientConn),
...@@ -61,16 +70,54 @@ func NewPluginGrpcClient(pluginConf map[string]string) (*PluginGrpcClient, error ...@@ -61,16 +70,54 @@ func NewPluginGrpcClient(pluginConf map[string]string) (*PluginGrpcClient, error
return h, nil return h, nil
} }
func (h *PluginGrpcClient) Close() { func (p *PluginGrpcClient) NewPluginClient(pluginConf map[string]string) error {
for _, cc := range h.pluginMap { for name, unixPath := range pluginConf {
if !strings.HasPrefix(unixPath, "unix:") {
unixPath = "unix:" + unixPath
}
conn, err := grpc.Dial(unixPath,
grpc.WithTransportCredentials(insecure.NewCredentials()),
// grpc.WithKeepaliveParams(keepalive.ClientParameters{
// Time: 10 * time.Second,
// Timeout: 20 * time.Second,
// }),
)
if err != nil {
return fmt.Errorf("failed to dial %s, err:%v", unixPath, err)
}
client := pb.NewPluginClient(conn)
p.pluginMap[name] = &pluginClientConn{conn: conn, client: client}
p.pluginConf[name] = unixPath
}
return nil
}
func (p *PluginGrpcClient) Close() {
for _, cc := range p.pluginMap {
cc.conn.Close()
}
p.pluginMap = make(map[string]*pluginClientConn)
p.pluginConf = make(map[string]string)
}
func (p *PluginGrpcClient) CloseSingle(name string) {
for pName, cc := range p.pluginMap {
if pName == name {
cc.conn.Close() cc.conn.Close()
delete(p.pluginMap, pName)
delete(p.pluginConf, name)
return
}
} }
} }
func (c *PluginGrpcClient) Call(ctx context.Context, req *pb.Req) (*pb.Res, error) { func (p *PluginGrpcClient) Call(ctx context.Context, req *pb.Req) (*pb.Res, error) {
// 查找目的插件对应的client // 查找目的插件对应的client
dstPlugin := req.Header.To dstPlugin := req.Header.To
cc, ok := c.pluginMap[dstPlugin] cc, ok := p.pluginMap[dstPlugin]
if !ok { if !ok {
return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin) return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin)
} }
...@@ -78,10 +125,10 @@ func (c *PluginGrpcClient) Call(ctx context.Context, req *pb.Req) (*pb.Res, erro ...@@ -78,10 +125,10 @@ func (c *PluginGrpcClient) Call(ctx context.Context, req *pb.Req) (*pb.Res, erro
return cc.client.Call(ctx, req) return cc.client.Call(ctx, req)
} }
func (c *PluginGrpcClient) SendFile(ctx context.Context, fs *pb.FileStream, filePath string) (*pb.Res, error) { func (p *PluginGrpcClient) SendFile(ctx context.Context, fs *pb.FileStream, filePath string) (*pb.Res, error) {
// 查找目的插件对应的client // 查找目的插件对应的client
dstPlugin := fs.Header.To dstPlugin := fs.Header.To
cc, ok := c.pluginMap[dstPlugin] cc, ok := p.pluginMap[dstPlugin]
if !ok { if !ok {
return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin) return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin)
} }
...@@ -125,10 +172,10 @@ func (c *PluginGrpcClient) SendFile(ctx context.Context, fs *pb.FileStream, file ...@@ -125,10 +172,10 @@ func (c *PluginGrpcClient) SendFile(ctx context.Context, fs *pb.FileStream, file
return res, nil return res, nil
} }
func (c *PluginGrpcClient) Chat(ctx context.Context, req *pb.Req) (pb.Plugin_ChatClient, error) { func (p *PluginGrpcClient) Chat(ctx context.Context, req *pb.Req) (pb.Plugin_ChatClient, error) {
// 查找目的插件对应的client // 查找目的插件对应的client
dstPlugin := req.Header.To dstPlugin := req.Header.To
cc, ok := c.pluginMap[dstPlugin] cc, ok := p.pluginMap[dstPlugin]
if !ok { if !ok {
return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin) return nil, fmt.Errorf("dst plugin conn %s not found", dstPlugin)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment