Commit 7584cff0 authored by Lei Li's avatar Lei Li
Browse files

fix: broker信息从远程服务提取

parent b9686f4c
...@@ -43,11 +43,13 @@ type HostInfo struct { ...@@ -43,11 +43,13 @@ type HostInfo struct {
AndroidVersion4 bool AndroidVersion4 bool
MQTTInfo *MQTTConnInfo MQTTInfo *MQTTConnInfo
OpenAPIInfo []*remoteHostInfo
} }
func GetPlatformInfo() { func GetPlatformInfo() {
HostInfoTail = &HostInfo{ HostInfoTail = &HostInfo{
Platform: PlatformNameAndroid, Platform: PlatformNameAndroid,
OpenAPIInfo: make([]*remoteHostInfo, 0),
} }
hostInfo, err := host.Info() hostInfo, err := host.Info()
......
package global package global
import (
"agent/pkg/encode"
"encoding/json"
"io"
"linkfog.com/public/lib/l"
"net/http"
"strconv"
"time"
)
const ( const (
MQTTUsername = "agent" MQTTUsername = "agent"
MQTTPassword = "Linkfog2023@emqx" MQTTPassword = "Linkfog2023@emqx"
MQTTDomainKey = "linkfog@@@"
MQTTHmacKey = "4N][zmn;bPW!a>F~iU+["
MQTTFailedToConnect = 0 MQTTFailedToConnect = 0
MQTTConnected = 1 MQTTConnected = 1
...@@ -14,6 +26,8 @@ const ( ...@@ -14,6 +26,8 @@ const (
ConsumerTopicStopPlugin = "stop-plugin" ConsumerTopicStopPlugin = "stop-plugin"
PublishTopicReportInfo = "report-info" PublishTopicReportInfo = "report-info"
RemoteHostInfoURL = "https://in.soeasynetwork.com/api/services"
) )
var SubscribePrefixInfo = "" var SubscribePrefixInfo = ""
...@@ -23,7 +37,7 @@ type MQTTConnInfo struct { ...@@ -23,7 +37,7 @@ type MQTTConnInfo struct {
ReconnectTimes int ReconnectTimes int
PublishChan chan *Message PublishChan chan *Message
ConsumeChan chan *Message ConsumeChan chan *Message
BrokerInfo []BrokerHostInfo BrokerInfo []*remoteHostInfo
} }
type Message struct { type Message struct {
...@@ -31,17 +45,145 @@ type Message struct { ...@@ -31,17 +45,145 @@ type Message struct {
Payload string Payload string
} }
type BrokerHostInfo struct { type remoteHostInfo struct {
BrokerIP string Node string
BrokerPort int IP string
Port int
} }
func InitMQTTInfo() { func InitMQTTInfo() {
m := &MQTTConnInfo{ m := &MQTTConnInfo{
PublishChan: make(chan *Message, 512), PublishChan: make(chan *Message, 512),
ConsumeChan: make(chan *Message, 512), ConsumeChan: make(chan *Message, 512),
BrokerInfo: make([]*remoteHostInfo, 0),
} }
// TODO: 获取Broker信息
HostInfoTail.MQTTInfo = m HostInfoTail.MQTTInfo = m
getRemoteHostInfo()
}
func getRemoteHostInfo() {
ts := time.Now().Unix()
tsDeci := strconv.FormatInt(ts, 10)
auth := encode.CalcDevHexedSign(MQTTDomainKey, MQTTHmacKey, tsDeci, MQTTHmacKey)
req, err := http.NewRequest("GET", RemoteHostInfoURL, nil)
if err != nil {
l.Errorf("fail to make mq domain service request, err:%v", err)
} else {
req.Header.Set("auth", auth)
req.Header.Set("ts", tsDeci)
client := http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
l.Errorf("fail to request mq domain service, err:%v", err)
} else {
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
l.Errorf("fail to ReadAll mq domain service, err:%v", err)
} else {
parseRemoteInfoFromResponse(b)
}
}
}
}
func parseRemoteInfoFromResponse(resp []byte) {
root := make(map[string]interface{})
err := json.Unmarshal(resp, &root)
if err != nil {
return
}
if c, ok := root["code"].(float64); !ok {
return
} else {
if c != 200 {
return
}
}
data, ok := root["data"].(map[string]interface{})
if !ok {
l.Warnf("fail to parse mq domain service response(data), err:%v", string(resp))
return
}
//openapi
saas, ok := data["openapi"].(map[string]interface{})
if !ok {
l.Warnf("fail to parse mq domain service response(data|openapi), err:%v", string(resp))
return
}
mst, ok := saas["master"].([]interface{})
if !ok {
l.Warnf("fail to parse mq domain service response(saas|master), err:%v", string(resp))
return
}
for _, v := range mst {
ins, ok := v.(map[string]interface{})
if !ok {
continue
}
host, ok := ins["host"].(string)
if !ok {
continue
}
HostInfoTail.OpenAPIInfo = append(HostInfoTail.OpenAPIInfo, &remoteHostInfo{
IP: host,
})
}
//mqtt
mqtt, ok := data["mqtt"].(map[string]interface{})
if !ok {
l.Warnf("fail to parse mq domain service response(data|mqtt), err:%v", string(resp))
return
}
mst, ok = mqtt["master"].([]interface{})
if !ok {
l.Warnf("fail to parse mq domain service response(mqtt|master), err:%v", string(resp))
return
}
bkp, ok := mqtt["backup"].([]interface{})
if !ok {
l.Warnf("fail to parse mq domain service response(mqtt|backup), err:%v", string(resp))
return
}
for _, v := range mst {
ins, ok := v.(map[string]interface{})
if !ok {
continue
}
host, ok := ins["host"].(string)
if !ok {
continue
}
HostInfoTail.MQTTInfo.BrokerInfo = append(HostInfoTail.MQTTInfo.BrokerInfo, &remoteHostInfo{
IP: host,
})
}
for _, v := range bkp {
ins, ok := v.(map[string]interface{})
if !ok {
continue
}
host, ok := ins["host"].(string)
if !ok {
continue
}
HostInfoTail.MQTTInfo.BrokerInfo = append(HostInfoTail.MQTTInfo.BrokerInfo, &remoteHostInfo{
IP: host,
})
}
return
} }
...@@ -41,7 +41,6 @@ const ( ...@@ -41,7 +41,6 @@ const (
Dcs9942URL = "https://newdcs.bkdomain.cn:9992" Dcs9942URL = "https://newdcs.bkdomain.cn:9992"
DcsUploadFlowDeviceAPI = "/api/v4/sds/uploadFlowDevice" DcsUploadFlowDeviceAPI = "/api/v4/sds/uploadFlowDevice"
HmacKey = "4N][zmn;bPW!a>F~iU+["
HmacKey2 = "semigroupal" HmacKey2 = "semigroupal"
) )
......
...@@ -37,7 +37,7 @@ func New(opts ...BackendOpt) *Backend { ...@@ -37,7 +37,7 @@ func New(opts ...BackendOpt) *Backend {
clientOpts := MQTT.NewClientOptions() clientOpts := MQTT.NewClientOptions()
for _, b := range global.HostInfoTail.MQTTInfo.BrokerInfo { for _, b := range global.HostInfoTail.MQTTInfo.BrokerInfo {
clientOpts.AddBroker(fmt.Sprintf("ssl://%s", b.BrokerIP)) clientOpts.AddBroker(fmt.Sprintf("ssl://%s", b.IP))
} }
// NOTE: 自测使用 // NOTE: 自测使用
......
package encode
import (
"crypto/hmac"
"crypto/sha512"
"encoding/hex"
)
func CalcDevHexedSign(modelId, devID, date string, hmacKey string) string {
mac := hmac.New(sha512.New, []byte(hmacKey))
mac.Write([]byte(modelId + devID + date))
return hex.EncodeToString(mac.Sum(nil))
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment