diff --git a/common/message.go b/common/message.go index 14b5057..72420e7 100755 --- a/common/message.go +++ b/common/message.go @@ -38,12 +38,15 @@ const ( DGTWINS_OPS_WATCH = "Watch" DGTWINS_OPS_SYNC = "Sync" DGTWINS_OPS_DETECT = "Detect" + DGTWINS_OPS_KEEPALIVE = "Keepalive" //State - DGTWINS_STATE_ONLINE ="online" - DGTWINS_STATE_OFFLINE ="offline" + DGTWINS_STATE_CREATED = "created" + DGTWINS_STATE_ONLINE = "online" + DGTWINS_STATE_OFFLINE = "offline" // Resource + DGTWINS_RESOURCE_EDGE ="edge" DGTWINS_RESOURCE_TWINS ="twins" DGTWINS_RESOURCE_PROPERTY ="property" DGTWINS_RESOURCE_DEVICE ="device" @@ -58,14 +61,14 @@ const ( //Create/update/Delete/Get twins message format type TwinMessage struct{ - Twins []DeviceTwin `json:"twins"` + Twins []DigitalTwin `json:"twins"` } // Response message format type TwinResponse struct{ Code int `json:"code"` Reason string `json:"reason,omitempty"` - Twins []DeviceTwin `json:"twins,omitempty"` + Twins []DigitalTwin `json:"twins,omitempty"` } /* @@ -107,7 +110,7 @@ func GetPropertyValue(props []TwinProperty, name string) *TwinProperty { } // BuildResponseMessage -func BuildResponseMessage(code int, reason string, twins []DeviceTwin) ([]byte, error){ +func BuildResponseMessage(code int, reason string, twins []DigitalTwin) ([]byte, error){ resp := &TwinResponse{ Code: code, Reason: reason, @@ -135,8 +138,25 @@ func UnMarshalResponseMessage(msg *model.Message)(*TwinResponse, error){ return &rspMsg, nil } +//UnMarshalTwinMessage. +func UnMarshalTwinMessage(msg *model.Message)(*TwinMessage, error){ + var twinMsg TwinMessage + + content, ok := msg.Content.([]byte) + if !ok { + return nil, errors.New("invaliad message content") + } + + err := json.Unmarshal(content, &twinMsg) + if err != nil { + return nil, err + } + + return &twinMsg, nil +} + // BuildTwinMessage -func BuildTwinMessage(twins []DeviceTwin) ([]byte, error){ +func BuildTwinMessage(twins []DigitalTwin) ([]byte, error){ twinMsg := &TwinMessage{ Twins: twins, } @@ -165,7 +185,7 @@ func BuildModelMessage(source string, target string, operation string, resource // GetTwinID func GetTwinID(msg *model.Message) string { - var twins []DeviceTwin + var twins []DigitalTwin operation := msg.GetOperation() content, ok := msg.Content.([]byte) @@ -255,3 +275,9 @@ func UnMarshalDeviceResponseMessage(msg *model.Message)(*DeviceResponse, error){ return &respMsg, nil } + +type EdgeInfo struct{ + EdgeID string `json:"edgeid"` + EdgeName string `json:"edgeid,omitempty"` + Description string `json:"edgeid,omitempty"` +} diff --git a/dgtwin/dtcontext/dtcontext.go b/dgtwin/dtcontext/dtcontext.go index 2e4f409..b35e6b1 100755 --- a/dgtwin/dtcontext/dtcontext.go +++ b/dgtwin/dtcontext/dtcontext.go @@ -151,17 +151,15 @@ func (dtc *DTContext) DGTwinIsExist (deviceID string) bool { return true } -func (dtc *DTContext) TwinIsOnline(deviceID string) bool { - v, _ := dtc.DGTwinList.Load(deviceID) +func (dtc *DTContext) GetTwinState(twinID string) string { + v, _ := dtc.DGTwinList.Load(twinID) dgTwin, _ := v.(*common.DigitalTwin) - if dgTwin != nil { - if dgTwin.State == common.DGTWINS_STATE_ONLINE { - return true - } + if dgTwin == nil { + return "" } - return false + return dgTwin.State } func (dtc *DTContext) BuildModelMessage(source string, target string, operation string, resource string, content interface{}) *model.Message { @@ -199,13 +197,9 @@ func (dtc *DTContext) SendResponseMessage(requestMsg *model.Message, content []b } //SendSyncMessage Send sync conten. -func (dtc *DTContext) SendSyncMessage(we *types.WatchEvent, content []byte){ - target := we.Source - resource := we.Resource - +func (dtc *DTContext) SendSyncMessage(target, resource string, content []byte){ modelMsg := dtc.BuildModelMessage(types.MODULE_NAME, target, - common.DGTWINS_OPS_SYNC, resource, content) - modelMsg.SetTag(we.MsgID) + common.DGTWINS_OPS_SYNC, resource, content) klog.Infof("Send sync message (%v)", modelMsg) dtc.SendToModule(types.DGTWINS_MODULE_COMM, modelMsg) diff --git a/dgtwin/dtmodule/comm.go b/dgtwin/dtmodule/comm.go index 2560154..a26422b 100755 --- a/dgtwin/dtmodule/comm.go +++ b/dgtwin/dtmodule/comm.go @@ -44,6 +44,7 @@ func (cm *CommModule) InitModule(dtc *dtcontext.DTContext, comm, heartBeat, conf //TODO: Device should has a healthcheck. func (cm *CommModule) Start(){ //Start loop. + checkTimeoutCh := time.After(10*time.Second) for { select { case msg, ok := <-cm.recieveChan: @@ -87,9 +88,10 @@ func (cm *CommModule) Start(){ klog.Infof("%s module stopped", cm.Name()) return } - case <-time.After(60*time.Second): + case <-checkTimeoutCh: //check the MessageCache for response. cm.dealMessageTimeout() + checkTimeoutCh = time.After(10*time.Second) } } } @@ -113,13 +115,16 @@ func (cm *CommModule) sendMessageToDevice(msg *model.Message) { //sendMessageToHub func (cm *CommModule) sendMessageToHub(msg *model.Message) { - //cache this message for confirm recieve the response. - id := msg.GetID() - _, exist := cm.context.MessageCache.Load(id) - if !exist { - cm.context.MessageCache.Store(id, msg) - } + operation := msg.GetOperation() + if strings.Compare(common.DGTWINS_OPS_RESPONSE, operation) != 0 { + //cache this message for confirm recieve the response. + id := msg.GetID() + _, exist := cm.context.MessageCache.Load(id) + if !exist { + cm.context.MessageCache.Store(id, msg) + } + } //send message to message hub. cm.context.Send(common.HubModuleName, msg) } @@ -152,7 +157,7 @@ func (cm *CommModule) dealMessageTimeout() { timeStamp := msg.GetTimestamp()/1e3 now := time.Now().UnixNano() / 1e9 if now - timeStamp >= types.DGTWINS_MSG_TIMEOUT { - if strings.Compare(common.DeviceName, target) == 0 { + if strings.Contains(target, common.DeviceName) { if strings.Compare(common.DGTWINS_OPS_RESPONSE, operation) != 0 { //mark device status is offline. //send package and tell twin module, device is offline. @@ -173,26 +178,32 @@ func (cm *CommModule) dealMessageTimeout() { cm.context.MessageCache.Delete(key) return true }else{ - if strings.Compare(common.DeviceName, target) == 0 && - strings.Compare(common.DGTWINS_OPS_SYNC, operation) == 0 { + if strings.Contains(target, common.DeviceName) && + strings.Compare(common.DGTWINS_OPS_DETECT, operation) == 0 { // this is a ping message to device, then, we delete this mark // and make this state as offline. twinID := common.GetTwinID(msg) - dgtwin := &common.DeviceTwin{ - ID: twinID, - State: common.DGTWINS_STATE_OFFLINE, - } + twinState := cm.context.GetTwinState(twinID) + if twinState != common.DGTWINS_STATE_CREATED && + twinState != common.DGTWINS_STATE_OFFLINE { + + dgtwin := &common.DeviceTwin{ + ID: twinID, + State: common.DGTWINS_STATE_OFFLINE, + } - msgContent, err := common.BuildDeviceMessage(dgtwin) - if err == nil { - modelMsg := common.BuildModelMessage(types.MODULE_NAME, types.MODULE_NAME, common.DGTWINS_OPS_UPDATE, + msgContent, err := common.BuildDeviceMessage(dgtwin) + if err == nil { + modelMsg := common.BuildModelMessage(types.MODULE_NAME, types.MODULE_NAME, common.DGTWINS_OPS_UPDATE, common.DGTWINS_RESOURCE_TWINS, msgContent) - cm.context.SendToModule(types.DGTWINS_MODULE_TWINS, modelMsg) + cm.context.SendToModule(types.DGTWINS_MODULE_TWINS, modelMsg) + } } - cm.context.MessageCache.Delete(key) + klog.Infof("### Detect Device(%s) is offline", twinID) }else { //resend this message. + klog.Infof("### Resend this message...") cm.context.SendToModule(types.DGTWINS_MODULE_COMM, msg) } return true diff --git a/dgtwin/dtmodule/device.go b/dgtwin/dtmodule/device.go index 1123d1e..b94ef16 100755 --- a/dgtwin/dtmodule/device.go +++ b/dgtwin/dtmodule/device.go @@ -58,6 +58,7 @@ func (dm *TwinModule) InitModule(dtc *dtcontext.DTContext, comm, heartBeat, conf //Start Device module func (dm *TwinModule) Start(){ + KeepaliveCh := time.After(120 *time.Second) //Start loop. for { select { @@ -91,9 +92,11 @@ func (dm *TwinModule) Start(){ klog.Infof("%s module stopped", dm.Name()) return } - case <-time.After(120*time.Second): + case <-KeepaliveCh: //Check & sync device's state. + klog.Infof("####### ping device #############") dm.PingDevice() + KeepaliveCh = time.After(120 *time.Second) } } } @@ -120,7 +123,6 @@ func (dm *TwinModule) twinsCreateHandle(msg *model.Message) (interface{}, error) return nil, err } - twins := make([]common.DeviceTwin, 0) //get all requested twins for key, _ := range twinMsg.Twins { twin := &twinMsg.Twins[key] @@ -130,7 +132,7 @@ func (dm *TwinModule) twinsCreateHandle(msg *model.Message) (interface{}, error) if !exist { dgTwin := &common.DigitalTwin{ ID: twinID, - State: common.DGTWINS_STATE_OFFLINE, + State: common.DGTWINS_STATE_CREATED, } //Create DGTwin is always success since it just create data startuctre @@ -140,19 +142,21 @@ func (dm *TwinModule) twinsCreateHandle(msg *model.Message) (interface{}, error) var deviceMutex sync.Mutex dm.context.DGTwinMutex.Store(twinID, &deviceMutex) //save to sqlite, implement in future. - //TODO: - - twins = append(twins, common.DeviceTwin{ID: twinID}) + //TODO: //detect the physical device // send broadcast to all device, and wait (own this ID) device's response, // if it has reply, then will report all property of this device. - dm.context.SendMessage2Device(common.DGTWINS_OPS_DETECT, twin) + deviceTwin := &common.DeviceTwin{ + ID: twinID, + State: common.DGTWINS_STATE_CREATED, + } + dm.context.SendMessage2Device(common.DGTWINS_OPS_DETECT, deviceTwin) } } //Send response. - msgContent, err := common.BuildResponseMessage(common.RequestSuccessCode, "Success", twins) + msgContent, err := common.BuildResponseMessage(common.RequestSuccessCode, "Success", twinMsg.Twins) if err != nil { //Internal err return nil, err @@ -199,10 +203,17 @@ func (dm *TwinModule) deviceUpdateHandle(msg *model.Message ) (interface{}, erro dm.context.Unlock(twinID) if err == nil { - klog.Infof("######### (%s) is online ##########", twinID) + klog.Infof("######### (%s) is %s ##########", twinID, oldTwin.State) klog.Infof("######### Device information update successful ##########") //notify others about device is online + twins := []common.DigitalTwin{*oldTwin} + msgContent, err := common.BuildTwinMessage(twins) + if err != nil { + return nil, err + } + //Send the Sync message. + dm.context.SendSyncMessage(common.CloudName, common.DGTWINS_RESOURCE_TWINS, msgContent) } else { //Internel err! } @@ -320,7 +331,8 @@ func (dm *TwinModule) deviceDeleteHandle(msg *model.Message) (interface{}, error } //notify the device delete link with dgtwin. - dm.context.SendMessage2Device(common.DGTWINS_OPS_DELETE, dgTwin) + devTwin := &common.DeviceTwin{ID: twinID} + dm.context.SendMessage2Device(common.DGTWINS_OPS_DELETE, devTwin) dm.context.SendResponseMessage(msg, msgContent) } @@ -332,7 +344,7 @@ func (dm *TwinModule) deviceDeleteHandle(msg *model.Message) (interface{}, error // If request twin is not exit, this func will return empty list. func (dm *TwinModule) deviceGetHandle(msg *model.Message) (interface{}, error) { var twinMsg common.TwinMessage - twins := make([]common.DeviceTwin, 0) + twins := make([]common.DigitalTwin, 0) content, ok := msg.Content.([]byte) if !ok { @@ -356,10 +368,7 @@ func (dm *TwinModule) deviceGetHandle(msg *model.Message) (interface{}, error) { return nil, errors.New("invalud digital twin type") } - //convert digital twin to device twin. - deviceTwin := dm.Digital2Device(savedTwin) - - twins = append(twins, *deviceTwin) + twins = append(twins, *savedTwin) }else { // not exist, ignore. } @@ -404,6 +413,7 @@ func (dm *TwinModule) deviceResponseHandle(msg *model.Message) (interface{}, err klog.Infof("Device is online, update device with (%v)", deviceMsg) dm.context.SendToModule(types.DGTWINS_MODULE_COMM, deviceMsg) + case common.DeviceNotReady: } @@ -425,14 +435,12 @@ func (dm *TwinModule) deviceResponseHandle(msg *model.Message) (interface{}, err func (dm *TwinModule) PingDevice() { dm.context.DGTwinList.Range(func(key, value interface{}) bool { twinID := key.(string) - - msgContent, err := common.BuildDeviceMessage(&common.DeviceTwin{ID: twinID}) - if err == nil { - modelMsg := dm.context.BuildModelMessage(types.MODULE_NAME, common.DeviceName, - common.DGTWINS_OPS_SYNC, common.DGTWINS_RESOURCE_DEVICE, msgContent) - dm.context.SendToModule(types.DGTWINS_MODULE_COMM, modelMsg) + twin := &common.DeviceTwin{ + ID: twinID, + State: dm.context.GetTwinState(twinID), } + dm.context.SendMessage2Device(common.DGTWINS_OPS_DETECT, twin) return true }) } diff --git a/dgtwin/dtmodule/property.go b/dgtwin/dtmodule/property.go index feea249..bad4aaa 100755 --- a/dgtwin/dtmodule/property.go +++ b/dgtwin/dtmodule/property.go @@ -3,7 +3,7 @@ package dtmodule import ( "errors" "strings" - "strconv" + _"strconv" "k8s.io/klog" "encoding/json" "github.com/jwzl/edgeOn/common" @@ -13,7 +13,7 @@ import ( ) type PropertyCmdFunc func(msg *model.Message ) error -type PropActionHandle func(msg *model.Message, savedTwin *common.DigitalTwin, msgTwin *common.DeviceTwin) error +type PropActionHandle func(msg *model.Message, savedTwin, msgTwin *common.DigitalTwin) error type PropertyModule struct { // module name name string @@ -38,8 +38,8 @@ func (pm *PropertyModule) initPropertyCmdTbl() { pm.propertyCmdTbl = make(map[string]PropertyCmdFunc) pm.propertyCmdTbl[common.DGTWINS_OPS_UPDATE] = pm.propUpdateHandle - pm.propertyCmdTbl[common.DGTWINS_OPS_DELETE] = pm.propDeleteHandle - pm.propertyCmdTbl[common.DGTWINS_OPS_GET] = pm.propGetHandle + //pm.propertyCmdTbl[common.DGTWINS_OPS_DELETE] = pm.propDeleteHandle + //pm.propertyCmdTbl[common.DGTWINS_OPS_GET] = pm.propGetHandle pm.propertyCmdTbl[common.DGTWINS_OPS_WATCH] = pm.propWatchHandle pm.propertyCmdTbl[common.DGTWINS_OPS_SYNC] = pm.propSyncHandle pm.propertyCmdTbl[common.DGTWINS_OPS_RESPONSE] = pm.propResponseHandle @@ -92,7 +92,7 @@ func (pm *PropertyModule) Start() { //propUpdateHandle: handle update property. func (pm *PropertyModule) propUpdateHandle(msg *model.Message ) error { - return pm.handleMessage(msg, func(msg *model.Message, savedTwin *common.DigitalTwin, msgTwin *common.DeviceTwin) error{ + return pm.handleMessage(msg, func(msg *model.Message, savedTwin, msgTwin *common.DigitalTwin) error{ //savedTwin and msgTwin are always != nil twinID := savedTwin.ID pm.context.Lock(twinID) @@ -104,17 +104,19 @@ func (pm *PropertyModule) propUpdateHandle(msg *model.Message ) error { //if msgTwin.Properties != nil { savedDesired := savedTwin.Properties.Desired newDesired := msgTwin.Properties.Desired + notifyDesired := make([]common.TwinProperty, 0) //Update twin property. for _ , prop := range newDesired { - savedDesired[prop.Name] = &prop + savedDesired[prop.Name] = prop + notifyDesired = append(notifyDesired, *prop) } //} pm.context.Unlock(twinID) //Send the response msgRespWhere := msg.GetSource() - twins := []common.DeviceTwin{*msgTwin} + twins := []common.DigitalTwin{*msgTwin} if strings.Compare(msgRespWhere, types.MODULE_NAME) != 0 { msgContent, err := common.BuildResponseMessage(common.RequestSuccessCode, "Success", twins) @@ -127,7 +129,9 @@ func (pm *PropertyModule) propUpdateHandle(msg *model.Message ) error { } // notify the device. - pm.context.SendMessage2Device(common.DGTWINS_OPS_UPDATE, msgTwin) + devTwin := &common.DeviceTwin{ID : twinID} + devTwin.Properties.Desired = notifyDesired + pm.context.SendMessage2Device(common.DGTWINS_OPS_UPDATE, devTwin) return nil }) @@ -135,9 +139,8 @@ func (pm *PropertyModule) propUpdateHandle(msg *model.Message ) error { //propDeleteHandle: delete property func (pm *PropertyModule) propDeleteHandle(msg *model.Message ) error { - - return pm.handleMessage(msg, func(msg *model.Message, savedTwin *common.DigitalTwin, msgTwin *common.DeviceTwin) error{ - twinID := savedTwin.ID + return pm.handleMessage(msg, func(msg *model.Message, savedTwin,msgTwin *common.DigitalTwin) error{ + /*twinID := savedTwin.ID pm.context.Lock(twinID) savedDesired := savedTwin.Properties.Desired @@ -193,16 +196,18 @@ func (pm *PropertyModule) propDeleteHandle(msg *model.Message ) error { if (newReported != nil && len(newReported) > 0 ) || (newDesired != nil && len(newDesired) > 0 ) { pm.context.SendMessage2Device(common.DGTWINS_OPS_DELETE, msgTwin) - } + }*/ return nil }) + } //propGetHandle: Get property. func (pm *PropertyModule) propGetHandle (msg *model.Message ) error { - return pm.handleMessage(msg, func(msg *model.Message, savedTwin *common.DigitalTwin, msgTwin *common.DeviceTwin) error{ - twinID := savedTwin.ID + + return pm.handleMessage(msg, func(msg *model.Message, savedTwin, msgTwin *common.DigitalTwin) error{ + /*twinID := savedTwin.ID pm.context.Lock(twinID) savedDesired := savedTwin.Properties.Desired @@ -275,9 +280,10 @@ func (pm *PropertyModule) propGetHandle (msg *model.Message ) error { return err } pm.context.SendResponseMessage(msg, msgContent) - + */ return nil }) + } // propWatchHandle: handle property watch. @@ -286,7 +292,7 @@ func (pm *PropertyModule) propGetHandle (msg *model.Message ) error { // If Properties is nil or no properties in request message, we consider it to watch all properties of // this twin. func (pm *PropertyModule) propWatchHandle (msg *model.Message ) error { - return pm.handleMessage(msg, func(msg *model.Message, savedTwin *common.DigitalTwin, msgTwin *common.DeviceTwin) error{ + return pm.handleMessage(msg, func(msg *model.Message, savedTwin, msgTwin *common.DigitalTwin) error{ /*twinID := savedTwin.ID if savedTwin.Properties == nil || len(savedTwin.Properties.Reported) < 1 { pm.sendNotFoundPropMessage(msg, msgTwin) @@ -353,7 +359,7 @@ func (pm *PropertyModule) propWatchHandle (msg *model.Message ) error { //propSyncHandle sync reported property from device. // device will report the update of device's property automaticly. func (pm *PropertyModule) propSyncHandle (msg *model.Message ) error { - var twinMsg common.DeviceMessage + var devMsg common.DeviceMessage if msg.GetSource() != common.DeviceName { klog.Infof("we just process the SYNC data from device.") @@ -365,16 +371,16 @@ func (pm *PropertyModule) propSyncHandle (msg *model.Message ) error { return errors.New("invaliad message content") } - err := json.Unmarshal(content, &twinMsg) + err := json.Unmarshal(content, &devMsg) if err != nil { return err } //Currently, we just support a twin's property's list since // we are just only foucus on the proprty's update. - dgTwin := &twinMsg.Twin + devTwin := &devMsg.Twin - twinID := dgTwin.ID + twinID := devTwin.ID exist := pm.context.DGTwinIsExist(twinID) if exist{ @@ -390,7 +396,8 @@ func (pm *PropertyModule) propSyncHandle (msg *model.Message ) error { //1. save the data. pm.context.Lock(twinID) savedReported := savedTwin.Properties.Reported - newReported := dgTwin.Properties.Reported + newReported := devTwin.Properties.Reported + syncReportedProps := make(map[string]*common.TwinProperty) if newReported != nil && len(newReported) > 0 { if savedReported == nil { @@ -402,51 +409,30 @@ func (pm *PropertyModule) propSyncHandle (msg *model.Message ) error { for _ , prop := range newReported { if _, ok := savedReported[prop.Name]; ok { savedReported[prop.Name] = &prop + syncReportedProps[prop.Name] = &prop } } } pm.context.Unlock(twinID) + //2. send response to device . - msgContent, err := common.BuildDeviceResponseMessage(strconv.Itoa(common.RequestSuccessCode), "SYNC Success", dgTwin) + /*msgContent, err := common.BuildDeviceResponseMessage(strconv.Itoa(common.RequestSuccessCode), "SYNC Success", dgTwin) if err != nil { return err } - pm.context.SendResponseMessage(msg, msgContent) - - //3. Check the cache event, if these property is cached, then send SYNC to target - /*pm.context.RangeWatchCache(func(key, value interface{}) bool { - ID := key.(string) - if twinID != ID { - return true - } - - pm.context.Lock(twinID) - newReported := dgTwin.Properties.Reported - syncProps := make(map[string]*types.PropertyValue) - watchEvent := value.(*types.WatchEvent) - for _, prop := range watchEvent.List { - propValue, exist := newReported[prop] - if exist { - syncProps[prop] = propValue - } - } - pm.context.Unlock(twinID) + pm.context.SendResponseMessage(msg, msgContent)*/ - twinProperties:= &types.TwinProperties{ - Reported: syncProps, - } - dgTwin.Properties = twinProperties - dgTwin.State = savedTwin.State + //3. Report the Sync result. + reportedTwin := DumpDigitalTwin(savedTwin) + reportedTwin.Properties.Reported = syncReportedProps - twins := []*types.DigitalTwin{dgTwin} - msgContent, err := types.BuildResponseMessage(types.RequestSuccessCode, "SYNC", twins) - if err != nil { - return false - } - pm.context.SendSyncMessage(watchEvent, msgContent) - - return true - })*/ + reportedTwins := []common.DigitalTwin{*reportedTwin} + msgContent, err := common.BuildTwinMessage(reportedTwins) + if err != nil { + return err + } + pm.context.SendSyncMessage(common.CloudName, common.DGTWINS_RESOURCE_PROPERTY, msgContent) + pm.context.SendSyncMessage(common.EdgeAppName, common.DGTWINS_RESOURCE_PROPERTY, msgContent) } return nil @@ -478,7 +464,7 @@ func (pm *PropertyModule) handleMessage (msg *model.Message, fn PropActionHandle exist := pm.context.DGTwinIsExist(twinID) if !exist { // Device has not created yet. - twins := []common.DeviceTwin{dgTwin} + twins := []common.DigitalTwin{dgTwin} msgContent, err := common.BuildResponseMessage(common.NotFoundCode, "Twin Not found", twins) if err != nil { return err @@ -528,8 +514,8 @@ func (pm *PropertyModule) propResponseHandle (msg *model.Message ) error { return nil } -func (pm *PropertyModule) sendNotFoundPropMessage(msg *model.Message, msgTwin *common.DeviceTwin) error { - twins := []common.DeviceTwin{*msgTwin} +func (pm *PropertyModule) sendNotFoundPropMessage(msg *model.Message, msgTwin *common.DigitalTwin) error { + twins := []common.DigitalTwin{*msgTwin} msgContent, err := common.BuildResponseMessage(common.NotFoundCode, "twin No property/No this property", twins) if err != nil { return err @@ -539,3 +525,18 @@ func (pm *PropertyModule) sendNotFoundPropMessage(msg *model.Message, msgTwin *c return nil } + + +func DumpDigitalTwin(twin *common.DigitalTwin) *common.DigitalTwin { + if twin == nil { + return nil + } + + dgTwin := &common.DigitalTwin{ + ID: twin.ID, + State: twin.State, + LastState: twin.LastState, + } + + return dgTwin +} diff --git a/dgtwin/types/types.go b/dgtwin/types/types.go index 5d78d40..de26d68 100755 --- a/dgtwin/types/types.go +++ b/dgtwin/types/types.go @@ -11,7 +11,7 @@ const ( DGTWINS_MODULE_PROPERTY = "property" DGTWINS_MODULE_COMM = "comm" - DGTWINS_MSG_TIMEOUT = 5 //5s + DGTWINS_MSG_TIMEOUT = 1*60 //5s ) type DTMessage struct { diff --git a/docs/images/digitaltwin.png b/docs/images/digitaltwin.png index 3c79464..eed8061 100755 Binary files a/docs/images/digitaltwin.png and b/docs/images/digitaltwin.png differ diff --git a/docs/images/edgecloud.png b/docs/images/edgecloud.png new file mode 100755 index 0000000..4a2f9e9 Binary files /dev/null and b/docs/images/edgecloud.png differ diff --git a/msghub/communicate/mqtt/mqtt.go b/msghub/communicate/mqtt/mqtt.go index f18e813..499333e 100755 --- a/msghub/communicate/mqtt/mqtt.go +++ b/msghub/communicate/mqtt/mqtt.go @@ -9,6 +9,7 @@ import ( "github.com/jwzl/mqtt/client" "github.com/jwzl/wssocket/fifo" "github.com/jwzl/wssocket/model" + "github.com/jwzl/edgeOn/common" "github.com/jwzl/edgeOn/msghub/config" ) @@ -21,13 +22,14 @@ const ( ) type MqttClient struct { + isBind bool // for mqtt send thread. - mutex sync.RWMutex - conf *config.MqttConfig - client *client.Client + mutex sync.RWMutex + conf *config.MqttConfig + client *client.Client // message fifo. - messageFifo *fifo.MessageFifo -} + messageFifo *fifo.MessageFifo +} func NewMqttClient(conf *config.MqttConfig) *MqttClient { if conf == nil { @@ -60,6 +62,7 @@ func NewMqttClient(conf *config.MqttConfig) *MqttClient { c.SetTlsConfig(tlsConfig) return &MqttClient{ + isBind: false, conf: conf, client: c, messageFifo: fifo.NewMessageFifo(0), @@ -104,11 +107,26 @@ func (c *MqttClient) messageArrived(topic string, msg *model.Message){ klog.Infof("topic =(%v), msg ignored", splitString) return } - if strings.Compare(splitString[4], "comm") == 0 { + if strings.Contains(splitString[4], "comm") { // put the model message into fifo. c.messageFifo.Write(msg) + }else if strings.Contains(splitString[4], "bind") { + //report the edge information. + info := common.EdgeInfo{ + EdgeID: c.conf.ClientID, + EdgeName: "", + Description: "", + } + modelMsg := common.BuildModelMessage(common.HubModuleName, common.CloudName, + common.DGTWINS_OPS_RESPONSE, common.DGTWINS_RESOURCE_EDGE, info) + c.WriteMessage("", modelMsg) + // start go rountine to send heartbeat. + if true != c.isBind { + go c.SendHeartBeat(modelMsg) + c.isBind =true + } }else{ - //TODO: + } } @@ -128,3 +146,17 @@ func (c *MqttClient) WriteMessage(clientID string, msg *model.Message) error { pubTopic := fmt.Sprintf("%s/%s/comm", MQTT_PUBTOPIC_PREFIX, clientID) return c.client.Publish(pubTopic, msg) } + +func (c *MqttClient) SendHeartBeat(msg *model.Message){ + KeepaliveCh := time.After(120 *time.Second) + msg.Router.Operation = common.DGTWINS_OPS_KEEPALIVE + + for { + <-KeepaliveCh + + pubTopic := fmt.Sprintf("%s/%s/hearbeat", MQTT_PUBTOPIC_PREFIX, c.conf.ClientID) + c.client.Publish(pubTopic, msg) + klog.Infof("####### Send heart beat to cloud. #############") + KeepaliveCh = time.After(120 *time.Second) + } +} diff --git a/tests/app b/tests/app deleted file mode 100755 index e7cfe0f..0000000 Binary files a/tests/app and /dev/null differ