Commit ba515042 authored by Lei Li's avatar Lei Li
Browse files

feat: 支持MQTT通信

parent 9b54ace3
package core package core
import ( import (
"agent/cmd/agent/config"
"agent/cmd/agent/global"
"agent/cmd/agent/option"
"agent/module"
"agent/module/plugin"
"agent/module/resources"
iTime "agent/pkg/time"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
...@@ -15,6 +8,14 @@ import ( ...@@ -15,6 +8,14 @@ import (
"syscall" "syscall"
"time" "time"
"agent/cmd/agent/config"
"agent/cmd/agent/global"
"agent/cmd/agent/option"
"agent/module"
"agent/module/backend"
"agent/module/plugin"
"agent/module/resources"
iTime "agent/pkg/time"
"linkfog.com/public/lib/l" "linkfog.com/public/lib/l"
"linkfog.com/public/lib/monitor_daemon" "linkfog.com/public/lib/monitor_daemon"
...@@ -33,6 +34,9 @@ func (a *Agent) Init(env svc.Environment) error { ...@@ -33,6 +34,9 @@ func (a *Agent) Init(env svc.Environment) error {
// 获取平台和内核版本 // 获取平台和内核版本
global.GetPlatformInfo() global.GetPlatformInfo()
// 初始化MQTT信息
global.InitMQTTInfo()
// 设置工作目录 // 设置工作目录
err := global.SetWorkDir() err := global.SetWorkDir()
if err != nil { if err != nil {
...@@ -101,6 +105,8 @@ func (a *Agent) Start() error { ...@@ -101,6 +105,8 @@ func (a *Agent) Start() error {
if err := startSvcModules(); err != nil { if err := startSvcModules(); err != nil {
return err return err
} }
go a.chatMsg()
return nil return nil
} }
...@@ -128,7 +134,6 @@ func genVersion() string { ...@@ -128,7 +134,6 @@ func genVersion() string {
} }
func initSvcCfg() error { func initSvcCfg() error {
// 异常情况处理 // 异常情况处理
if _, err := os.Stat(global.SelfOperatedAndroid); os.IsExist(err) { if _, err := os.Stat(global.SelfOperatedAndroid); os.IsExist(err) {
l.Infof("the host is a self operated Android device") l.Infof("the host is a self operated Android device")
...@@ -136,7 +141,12 @@ func initSvcCfg() error { ...@@ -136,7 +141,12 @@ func initSvcCfg() error {
time.Sleep(24 * time.Hour) time.Sleep(24 * time.Hour)
} }
} }
// TODO 序列号设置 // TODO: 序列号设置
if option.Opt.SerialNumber != "" {
global.DeviceSerialNumber = option.Opt.SerialNumber
} else {
}
// 时间修正 // 时间修正
if !iTime.SynchronizeSystemTime() { if !iTime.SynchronizeSystemTime() {
...@@ -145,10 +155,17 @@ func initSvcCfg() error { ...@@ -145,10 +155,17 @@ func initSvcCfg() error {
// 获取安卓信息 // 获取安卓信息
global.GetAndroidInfo() global.GetAndroidInfo()
l.Infof("platform: %s, kernelVersion: %s, workdir: %s, deviceSN: %s", global.HostInfoTail.Platform,
global.HostInfoTail.KernelVersion, config.WorkDir, global.DeviceSerialNumber)
return nil return nil
} }
func startSvcModules() error { func startSvcModules() error {
// 启动后端通信
if err := module.RegisterModule(backend.New()); err != nil {
return fmt.Errorf("register module backend err:%v", err)
}
// 启动插件管理器 // 启动插件管理器
if option.Opt.EnablePluginManagerModule { if option.Opt.EnablePluginManagerModule {
if err := module.RegisterModule(plugin.New()); err != nil { if err := module.RegisterModule(plugin.New()); err != nil {
...@@ -162,6 +179,7 @@ func startSvcModules() error { ...@@ -162,6 +179,7 @@ func startSvcModules() error {
return fmt.Errorf("register module resources err:%v", err) return fmt.Errorf("register module resources err:%v", err)
} }
} }
return nil return nil
} }
......
package core
import (
"agent/module"
"time"
"agent/cmd/agent/global"
"linkfog.com/public/lib/l"
)
func (a *Agent) chatMsg() {
ticker := time.NewTicker(3 * time.Second)
for {
select {
case msg := <-global.HostInfoTail.MQTTInfo.ConsumeChan:
a.chatMsgProcess(msg)
case <-ticker.C:
if a.IsStop {
l.Info("agent chat exit")
return
}
}
}
}
func (a *Agent) chatMsgProcess(msg *global.Message) {
l.Infof("agent chat msg process msg:%+v", msg)
switch msg.Key {
case global.ConsumerTopicAgentSelfUpgrade:
case global.ConsumerTopicPluginUpgrade:
err := module.SendMsgToMod(global.PluginModuleName, msg)
if err != nil {
l.Error(err)
return
}
fallthrough
case global.ConsumerTopicStartupPlugin:
fallthrough
case global.ConsumerTopicStopPlugin:
}
}
package global package global
import ( import (
"agent/cmd/agent/config"
"os" "os"
"os/exec" "os/exec"
"os/user" "os/user"
...@@ -10,6 +9,7 @@ import ( ...@@ -10,6 +9,7 @@ import (
"runtime" "runtime"
"strings" "strings"
"agent/cmd/agent/config"
"linkfog.com/public/lib/l" "linkfog.com/public/lib/l"
"github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/host"
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
const ( const (
ResourceModuleName = "resources" ResourceModuleName = "resources"
PluginModuleName = "pluginMgr" PluginModuleName = "pluginMgr"
BackendModuleName = "backend"
PlatformNameAndroid = "android" PlatformNameAndroid = "android"
PlatformNameX86 = "x86" PlatformNameX86 = "x86"
...@@ -26,6 +27,10 @@ const ( ...@@ -26,6 +27,10 @@ const (
AndroidVersion4 = "19" AndroidVersion4 = "19"
) )
var (
DeviceSerialNumber string
)
var HostInfoTail *HostInfo var HostInfoTail *HostInfo
type HostInfo struct { type HostInfo struct {
...@@ -36,6 +41,8 @@ type HostInfo struct { ...@@ -36,6 +41,8 @@ type HostInfo struct {
UID string UID string
SDKVersion string SDKVersion string
AndroidVersion4 bool AndroidVersion4 bool
MQTTInfo *MQTTConnInfo
} }
func GetPlatformInfo() { func GetPlatformInfo() {
...@@ -60,7 +67,6 @@ func GetPlatformInfo() { ...@@ -60,7 +67,6 @@ func GetPlatformInfo() {
func SetWorkDir() error { func SetWorkDir() error {
l.Infof("the architecture(GOARCH) type is %s", runtime.GOARCH)
if strings.Contains(runtime.GOARCH, "amd64") || strings.Contains(runtime.GOARCH, "arm64") || if strings.Contains(runtime.GOARCH, "amd64") || strings.Contains(runtime.GOARCH, "arm64") ||
strings.Contains(runtime.GOARCH, "aarch64") { strings.Contains(runtime.GOARCH, "aarch64") {
config.WorkDir = "/linkfog/agent/" config.WorkDir = "/linkfog/agent/"
......
package global
const (
MQTTUsername = "agent"
MQTTPassword = "Linkfog2023@emqx"
MQTTFailedToConnect = 0
MQTTConnected = 1
MQTTReConnecting = 2
ConsumerTopicAgentSelfUpgrade = "agent-self-upgrade"
ConsumerTopicPluginUpgrade = "plugin-upgrade"
ConsumerTopicStartupPlugin = "startup-plugin"
ConsumerTopicStopPlugin = "stop-plugin"
)
type MQTTConnInfo struct {
Status int
ReconnectTimes int
PublishChan chan *Message
ConsumeChan chan *Message
BrokerInfo []BrokerHostInfo
}
type Message struct {
Key string
Payload string
}
type BrokerHostInfo struct {
BrokerIP string
BrokerPort int
}
func InitMQTTInfo() {
m := &MQTTConnInfo{
PublishChan: make(chan *Message, 512),
ConsumeChan: make(chan *Message, 512),
}
// TODO: 获取Broker信息
HostInfoTail.MQTTInfo = m
}
...@@ -8,6 +8,7 @@ replace linkfog.com/pluginx => web.lueluesay.top/git/lil/pluginx v0.0.0-20241014 ...@@ -8,6 +8,7 @@ replace linkfog.com/pluginx => web.lueluesay.top/git/lil/pluginx v0.0.0-20241014
require ( require (
github.com/beevik/ntp v1.4.3 github.com/beevik/ntp v1.4.3
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/judwhite/go-svc v1.2.1 github.com/judwhite/go-svc v1.2.1
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/prometheus/procfs v0.15.1 github.com/prometheus/procfs v0.15.1
...@@ -19,12 +20,14 @@ require ( ...@@ -19,12 +20,14 @@ require (
require ( require (
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.4 // indirect github.com/golang/protobuf v1.5.4 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/satori/go.uuid v1.2.0 // indirect github.com/satori/go.uuid v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect github.com/tklauser/numcpus v0.8.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/net v0.28.0 // indirect golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
......
...@@ -2,12 +2,16 @@ github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho= ...@@ -2,12 +2,16 @@ github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho=
github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q= github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/judwhite/go-svc v1.2.1 h1:a7fsJzYUa33sfDJRF2N/WXhA+LonCEEY8BJb1tuS5tA= github.com/judwhite/go-svc v1.2.1 h1:a7fsJzYUa33sfDJRF2N/WXhA+LonCEEY8BJb1tuS5tA=
github.com/judwhite/go-svc v1.2.1/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk= github.com/judwhite/go-svc v1.2.1/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
...@@ -36,6 +40,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo ...@@ -36,6 +40,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
......
package backend package backend
import (
"fmt"
"strings"
"time"
"agent/cmd/agent/global"
"linkfog.com/public/lib/l"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type Backend struct {
isRunning bool
client MQTT.Client
ReconnectTimes int
Status int
signal chan struct{}
}
type BackendOpt func(*Backend)
var backendMod *Backend
func New(opts ...BackendOpt) *Backend {
b := Backend{
signal: make(chan struct{}),
}
for _, opt := range opts {
opt(&b)
}
clientOpts := MQTT.NewClientOptions()
for _, b := range global.HostInfoTail.MQTTInfo.BrokerInfo {
clientOpts.AddBroker(fmt.Sprintf("ssl://%s", b.BrokerIP))
}
clientOpts.SetClientID(global.DeviceSerialNumber)
clientOpts.SetUsername(global.MQTTUsername)
clientOpts.SetPassword(global.MQTTPassword)
// TODO: TLS配置
// 设置session持久化订阅,基于clientid,在客户端断开连接时topic不会自动删除
clientOpts.SetCleanSession(false)
// 设置自动重连
clientOpts.SetAutoReconnect(true)
clientOpts.SetMaxReconnectInterval(10 * time.Second)
// 设置回调函数
clientOpts.SetOnConnectHandler(connectCallback)
clientOpts.SetConnectionLostHandler(connectInterruptionCallback)
clientOpts.SetReconnectingHandler(reconnectingCallback)
clientOpts.SetDefaultPublishHandler(messagePubHandler)
// 初始化客户端
b.client = MQTT.NewClient(clientOpts)
backendMod = &b
return &b
}
func (b *Backend) Start() error {
b.isRunning = true
for {
if token := b.client.Connect(); token.Wait() && token.Error() != nil {
l.Infof("MQTT init connecting err :%v", token.Error())
b.Status = global.MQTTFailedToConnect
b.ReconnectTimes++
} else {
l.Info("MQTT init connecting success")
b.Status = global.MQTTConnected
break
}
time.Sleep(2 * time.Minute)
}
go func() {
for {
select {
case msg := <-global.HostInfoTail.MQTTInfo.PublishChan:
topic := translateTopic(msg.Key)
if len(topic) > 0 {
b.client.Publish(topic, 1, false, []byte(msg.Payload))
}
case <-b.signal:
l.Info("MQTT publish exit")
return
}
}
}()
return nil
}
func (b *Backend) Stop() {
if b.IsRunning() {
b.isRunning = false
b.signal <- struct{}{}
b.client.Disconnect(250)
}
}
func (b *Backend) Name() string { return global.BackendModuleName }
func (b *Backend) IsRunning() bool {
return b.isRunning
}
func connectCallback(client MQTT.Client) {
l.Info("MQTT connected")
backendMod.Status = global.MQTTConnected
// 订阅
if token := client.Subscribe(global.DeviceSerialNumber+"/publish/#", 1, nil); token.Wait() && token.Error() != nil {
l.Error(token.Error())
}
//if token := client.Subscribe("device/sleep", 1, nil); token.Wait() && token.Error() != nil {
// l.Error(token.Error())
//}
}
func connectInterruptionCallback(client MQTT.Client, err error) {
l.Infof("MQTT connection interruption err:%s", err)
backendMod.Status = global.MQTTReConnecting
}
func reconnectingCallback(client MQTT.Client, opts *MQTT.ClientOptions) {
l.Info("MQTT reconnecting")
backendMod.ReconnectTimes++
}
func messagePubHandler(client MQTT.Client, msg MQTT.Message) {
l.Infof("MQTT Received message from topic %s, %s", msg.Topic(), string(msg.Payload()))
if strings.HasPrefix(msg.Topic(), global.DeviceSerialNumber+"/publish/") {
global.HostInfoTail.MQTTInfo.ConsumeChan <- &global.Message{
Payload: string(msg.Payload()),
Key: msg.Topic(),
}
}
//} else if msg.Topic() == "device/sleep" {
// mp := make(map[string]interface{})
// err := json.Unmarshal(msg.Payload(), &mp)
// if err != nil {
// l.Errorf("MQTT fail to json Unmarshal, err:%v", err)
// return
// }
// if tmp, ok := mp["sleep"].(string); ok {
// s, _ := strconv.Atoi(tmp)
// time.Sleep(time.Duration(s) * time.Second)
// }
// mp["sn"] = global.DeviceSerialNumber
// data, _ := json.Marshal(mp)
// client.Publish("device/awake", 1, false, data)
//}
}
func translateTopic(key string) (ret string) {
if l := strings.Index(key, "agent."); l != -1 && l < len(key) {
ret = strings.Replace(key[l:], ".", "_", -1)
}
return ret
}
func (b *Backend) Receive(msg *global.Message) error {
return nil
}
package module package module
import ( import (
"agent/cmd/agent/global"
"fmt" "fmt"
"time" "time"
...@@ -12,6 +13,7 @@ type Runnable interface { ...@@ -12,6 +13,7 @@ type Runnable interface {
Start() error Start() error
Stop() Stop()
IsRunning() bool IsRunning() bool
Receive(message *global.Message) error
} }
var moduleTable = map[string]Runnable{} var moduleTable = map[string]Runnable{}
...@@ -58,3 +60,19 @@ func StopEveryModule() { ...@@ -58,3 +60,19 @@ func StopEveryModule() {
// just give 3 second for every module to stop gracefully // just give 3 second for every module to stop gracefully
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
func SendMsgToMod(name string, msg *global.Message) error {
if mod, ok := moduleTable[name]; !ok {
return fmt.Errorf("module %s not exists", name)
} else {
if mod.IsRunning() {
err := mod.Receive(msg)
if err != nil {
return err
}
} else {
return fmt.Errorf("module %s is stoped", name)
}
}
return nil
}
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"agent/cmd/agent/global" "agent/cmd/agent/global"
"fmt" "fmt"
"linkfog.com/public/lib/common" "linkfog.com/public/lib/common"
"strings"
"sync" "sync"
"time" "time"
...@@ -29,7 +28,7 @@ var defaultPluginCliConf = map[string]string{ ...@@ -29,7 +28,7 @@ var defaultPluginCliConf = map[string]string{
} }
type Plugin struct { type Plugin struct {
signal chan string signal chan *global.Message
signalSize int signalSize int
isRunning bool isRunning bool
plgServer *pluginServer plgServer *pluginServer
...@@ -44,12 +43,12 @@ type PluginOpt func(*Plugin) ...@@ -44,12 +43,12 @@ type PluginOpt func(*Plugin)
func New(opts ...PluginOpt) *Plugin { func New(opts ...PluginOpt) *Plugin {
p := Plugin{ p := Plugin{
signalSize: 10, signalSize: 512,
} }
for _, opt := range opts { for _, opt := range opts {
opt(&p) opt(&p)
} }
p.signal = make(chan string, p.signalSize) p.signal = make(chan *global.Message, p.signalSize)
return &p return &p
} }
...@@ -103,7 +102,7 @@ func (p *Plugin) Start() error { ...@@ -103,7 +102,7 @@ func (p *Plugin) Start() error {
} }
go func() { go func() {
var sig string var sig *global.Message
var ok bool var ok bool
for { for {
select { select {
...@@ -113,11 +112,6 @@ func (p *Plugin) Start() error { ...@@ -113,11 +112,6 @@ func (p *Plugin) Start() error {
l.Info("module resources exit") l.Info("module resources exit")
return return
} else { } else {
if len(sig) < 120 {
l.Info("module resources receive signal:", sig)
} else {
l.Info("module resources receive signal:", sig[:120])
}
p.dealWithSig(sig) p.dealWithSig(sig)
} }
} }
...@@ -152,23 +146,22 @@ func (p *Plugin) Stop() { ...@@ -152,23 +146,22 @@ func (p *Plugin) Stop() {
} }
} }
func (p *Plugin) Receive(msg string) error { func (p *Plugin) Receive(msg *global.Message) error {
if len(p.signal) > (p.signalSize - 2) { if len(p.signal) > (p.signalSize - 2) {
return l.WrapError("signal chan is full, drop msg:", msg) return l.WrapError("plugin manager signal chan is full, drop msg:", msg.Key)
} }
p.signal <- msg p.signal <- msg
return nil return nil
} }
func (p *Plugin) dealWithSig(msgStr string) { func (p *Plugin) dealWithSig(msg *global.Message) {
splits := strings.SplitN(msgStr, ":", 3) defer common.TimeCost("deal sig finished, msgType:" + msg.Key)()
if len(splits) != 3 {
l.Warn("bad msg fmt from server", msgStr) switch msg.Key {
return case global.ConsumerTopicPluginUpgrade:
}
msgType := splits[1]
defer common.TimeCost("deal sig finished, msgType:" + msgType)() case global.ConsumerTopicStartupPlugin:
l.Info("ignore sig:", msgStr) case global.ConsumerTopicStopPlugin:
}
} }
...@@ -58,3 +58,7 @@ func (r *Resources) IsRunning() bool { ...@@ -58,3 +58,7 @@ func (r *Resources) IsRunning() bool {
return r.isRunning return r.isRunning
} }
func (r *Resources) Receive(msg *global.Message) error {
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment