package backend import ( "crypto/tls" "crypto/x509" "fmt" "strings" "time" "agent/cmd/agent/config" "agent/cmd/agent/global" "agent/cmd/agent/option" "linkfog.com/public/lib/l" MQTT "github.com/eclipse/paho.mqtt.golang" ) type Backend struct { isRunning bool client MQTT.Client ReconnectTimes int Status int signal chan struct{} } type BackendOpt func(*Backend) var backendMod *Backend func New(opts ...BackendOpt) *Backend { b := Backend{ signal: make(chan struct{}), } for _, opt := range opts { opt(&b) } clientOpts := MQTT.NewClientOptions() for _, b := range global.HostInfoTail.MQTTInfo.BrokerInfo { clientOpts.AddBroker(fmt.Sprintf("ssl://%s", b.IP)) } // NOTE: 自测使用 if config.Edition == "dev" { clientOpts.AddBroker(fmt.Sprintf("%s:%d", option.Opt.MQTTBrokerURL, option.Opt.MQTTBrokerPort)) } clientOpts.SetClientID(global.DeviceSerialNumber) clientOpts.SetUsername(global.MQTTUsername) clientOpts.SetPassword(global.MQTTPassword) // TLS配置 tlsConfig, err := newTlsConfig() if err != nil { l.Error(err) return nil } clientOpts.SetTLSConfig(tlsConfig) // 设置session持久化订阅,基于clientid,在客户端断开连接时topic不会自动删除 clientOpts.SetCleanSession(false) // 设置自动重连 clientOpts.SetAutoReconnect(true) clientOpts.SetMaxReconnectInterval(10 * time.Second) // 设置回调函数 clientOpts.SetOnConnectHandler(connectCallback) clientOpts.SetConnectionLostHandler(connectInterruptionCallback) clientOpts.SetReconnectingHandler(reconnectingCallback) clientOpts.SetDefaultPublishHandler(handleSubscribeMsgCallback) // 初始化客户端 b.client = MQTT.NewClient(clientOpts) backendMod = &b return &b } func (b *Backend) Start() error { b.isRunning = true for { if token := b.client.Connect(); token.Wait() && token.Error() != nil { l.Warnf("MQTT init connecting err :%v", token.Error()) b.Status = global.MQTTFailedToConnect b.ReconnectTimes++ } else { l.Info("MQTT init connecting success") b.Status = global.MQTTConnected break } time.Sleep(2 * time.Minute) } go func() { for { select { case msg := <-global.HostInfoTail.MQTTInfo.PublishChan: topic := translateTopic(msg.Key) if len(topic) > 0 { b.client.Publish(topic, 1, false, []byte(msg.Payload)) } case <-b.signal: l.Info("MQTT exit") return } } }() return nil } func (b *Backend) Stop() { if b.IsRunning() { b.isRunning = false b.signal <- struct{}{} b.client.Disconnect(250) } } func (b *Backend) Name() string { return global.BackendModuleName } func (b *Backend) IsRunning() bool { return b.isRunning } func connectCallback(client MQTT.Client) { l.Info("MQTT connected") backendMod.Status = global.MQTTConnected // 订阅 if token := client.Subscribe(global.DeviceSerialNumber+"/publish/#", 1, nil); token.Wait() && token.Error() != nil { l.Error(token.Error()) } //if token := client.Subscribe("device/sleep", 1, nil); token.Wait() && token.Error() != nil { // l.Error(token.Error()) //} } func connectInterruptionCallback(client MQTT.Client, err error) { l.Infof("MQTT connection interruption err:%s", err) backendMod.Status = global.MQTTReConnecting } func reconnectingCallback(client MQTT.Client, opts *MQTT.ClientOptions) { l.Info("MQTT reconnecting") backendMod.ReconnectTimes++ } func handleSubscribeMsgCallback(client MQTT.Client, msg MQTT.Message) { l.Infof("MQTT Received message from topic %s, %s", msg.Topic(), string(msg.Payload())) if strings.HasPrefix(msg.Topic(), global.SubscribePrefixInfo) { key := msg.Topic()[strings.Index(msg.Topic(), global.SubscribePrefixInfo)+len(global.SubscribePrefixInfo):] global.HostInfoTail.MQTTInfo.ConsumeChan <- &global.Message{ Payload: string(msg.Payload()), Key: key, } } //} else if msg.Topic() == "device/sleep" { // mp := make(map[string]interface{}) // err := json.Unmarshal(msg.Payload(), &mp) // if err != nil { // l.Errorf("MQTT fail to json Unmarshal, err:%v", err) // return // } // if tmp, ok := mp["sleep"].(string); ok { // s, _ := strconv.Atoi(tmp) // time.Sleep(time.Duration(s) * time.Second) // } // mp["sn"] = global.DeviceSerialNumber // data, _ := json.Marshal(mp) // client.Publish("device/awake", 1, false, data) //} } func translateTopic(key string) (ret string) { if l := strings.Index(key, "agent."); l != -1 && l < len(key) { ret = strings.Replace(key[l:], ".", "_", -1) } return ret } func (b *Backend) Receive(msg *global.Message) error { return nil } func newTlsConfig() (*tls.Config, error) { // 配置 TLS 以忽略证书验证 cfg := new(tls.Config) cfg.RootCAs = x509.NewCertPool() certPemBytes, err := global.CertFS.ReadFile(global.MyCertPem) if err != nil { return nil, fmt.Errorf("load cert pem err:%s", err) } certKeyBytes, err := global.CertFS.ReadFile(global.MyCertKey) if err != nil { return nil, fmt.Errorf("load key pem err:%s", err) } cert, err := tls.X509KeyPair(certPemBytes, certKeyBytes) if err != nil { return nil, fmt.Errorf("parse cert/key err:%s", err) } cfg.Certificates = append(cfg.Certificates, cert) cfg.InsecureSkipVerify = true return cfg, nil }