diff --git a/controllers/compute.go b/controllers/compute.go index 132b948..0acbfac 100755 --- a/controllers/compute.go +++ b/controllers/compute.go @@ -32,10 +32,9 @@ func (o *ComputeController) Put() { data := oclib.NewRequest(comp_collection, user, peerID, groups, nil).UpdateOne(res, id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_UPDATE, DataType: comp_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -55,10 +54,9 @@ func (o *ComputeController) Post() { data := oclib.NewRequest(comp_collection, user, peerID, groups, nil).StoreOne(res) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_CREATE, DataType: comp_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -115,10 +113,9 @@ func (o *ComputeController) Delete() { data := oclib.NewRequest(comp_collection, user, peerID, groups, nil).DeleteOne(id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_DELETE, DataType: comp_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -140,10 +137,9 @@ func (o *ComputeController) SearchDecentralized() { "search": search, "type": t, }) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: comp_dt.EnumIndex(), - User: user, Payload: b, }) if err != nil { diff --git a/controllers/data.go b/controllers/data.go index 8293805..7692b17 100755 --- a/controllers/data.go +++ b/controllers/data.go @@ -32,10 +32,9 @@ func (o *DataController) Put() { data := oclib.NewRequest(data_collection, user, peerID, groups, nil).UpdateOne(res, id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_UPDATE, DataType: data_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -55,10 +54,9 @@ func (o *DataController) Post() { data := oclib.NewRequest(data_collection, user, peerID, groups, nil).StoreOne(res) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_CREATE, DataType: data_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -116,10 +114,9 @@ func (o *DataController) Delete() { data := oclib.NewRequest(data_collection, user, peerID, groups, nil).DeleteOne(id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_DELETE, DataType: data_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -141,10 +138,9 @@ func (o *DataController) SearchDecentralized() { "search": search, "type": t, }) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: data_dt.EnumIndex(), - User: user, Payload: b, }) if err != nil { diff --git a/controllers/processing.go b/controllers/processing.go index b30faca..94378f3 100755 --- a/controllers/processing.go +++ b/controllers/processing.go @@ -32,10 +32,9 @@ func (o *ProcessingController) Put() { data := oclib.NewRequest(processing_collection, user, peerID, groups, nil).UpdateOne(res, id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_UPDATE, DataType: processing_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -55,10 +54,9 @@ func (o *ProcessingController) Post() { data := oclib.NewRequest(processing_collection, user, peerID, groups, nil).StoreOne(res) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_CREATE, DataType: processing_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -115,10 +113,9 @@ func (o *ProcessingController) Delete() { data := oclib.NewRequest(processing_collection, user, peerID, groups, nil).DeleteOne(id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_DELETE, DataType: processing_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -140,10 +137,9 @@ func (o *ProcessingController) SearchDecentralized() { "search": search, "type": t, }) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: processing_dt.EnumIndex(), - User: user, Payload: b, }) if err != nil { diff --git a/controllers/resource.go b/controllers/resource.go index 0f25395..9254e30 100755 --- a/controllers/resource.go +++ b/controllers/resource.go @@ -101,10 +101,9 @@ func (o *ResourceController) SearchDecentralized() { "search": search, "type": t, }) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: -1, - User: user, Payload: b, }) if err != nil { diff --git a/controllers/storage.go b/controllers/storage.go index 1d459c1..17c8a32 100755 --- a/controllers/storage.go +++ b/controllers/storage.go @@ -32,10 +32,9 @@ func (o *StorageController) Put() { data := oclib.NewRequest(storage_collection, user, peerID, groups, nil).UpdateOne(res, id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_UPDATE, DataType: storage_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -55,10 +54,9 @@ func (o *StorageController) Post() { data := oclib.NewRequest(storage_collection, user, peerID, groups, nil).StoreOne(res) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_CREATE, DataType: storage_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -115,10 +113,9 @@ func (o *StorageController) Delete() { data := oclib.NewRequest(storage_collection, user, peerID, groups, nil).DeleteOne(id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_DELETE, DataType: storage_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -140,10 +137,9 @@ func (o *StorageController) SearchDecentralized() { "search": search, "type": t, }) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: storage_dt.EnumIndex(), - User: user, Payload: b, }) if err != nil { diff --git a/controllers/workflow.go b/controllers/workflow.go index 0c85a3c..067408e 100755 --- a/controllers/workflow.go +++ b/controllers/workflow.go @@ -32,11 +32,10 @@ func (o *WorkflowController) Put() { data := oclib.NewRequest(workflow_collection, user, peerID, groups, nil).UpdateOne(res, id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_UPDATE, Payload: data, DataType: workflow_dt.EnumIndex(), - User: user, }) } o.Data["json"] = data @@ -55,10 +54,9 @@ func (o *WorkflowController) Post() { data := oclib.NewRequest(workflow_collection, user, peerID, groups, nil).StoreOne(res) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_CREATE, DataType: workflow_dt.EnumIndex(), - User: user, Payload: data, }) } @@ -115,10 +113,9 @@ func (o *WorkflowController) SearchDecentralized() { o.ServeJSON() return } - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: workflow_dt.EnumIndex(), - User: user, Payload: b, }) Websocket(o.Ctx.Request.Context(), user, o.Ctx.ResponseWriter, o.Ctx.Request) @@ -147,10 +144,9 @@ func (o *WorkflowController) Delete() { data := oclib.NewRequest(workflow_collection, user, peerID, groups, nil).DeleteOne(id) if data.Err == "" { data, _ := json.Marshal(data.Data.Serialize(data.Data)) - infrastructure.EmitNATS(tools.PropalgationMessage{ + infrastructure.EmitNATS(user, tools.PropalgationMessage{ DataType: workflow_dt.EnumIndex(), Action: tools.PB_DELETE, - User: user, Payload: data, }) } diff --git a/go.mod b/go.mod index 0af94d3..23f6de9 100755 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24 toolchain go1.24.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260128160440-c0d89ea9e1e8 + cloud.o-forge.io/core/oc-lib v0.0.0-20260128162702-97cf629e27ec github.com/beego/beego/v2 v2.3.4 github.com/smartystreets/goconvey v1.7.2 ) diff --git a/go.sum b/go.sum index 808e512..28b03fb 100755 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260128160440-c0d89ea9e1e8 h1:h7VHJktaTT8TxO4ld3Xjw3LzMsivr3m7mzbNxb44zes= cloud.o-forge.io/core/oc-lib v0.0.0-20260128160440-c0d89ea9e1e8/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260128162702-97cf629e27ec h1:/uvrtEt7A5rwqFPHH8yjujlC33HMjQHhWDIK6I08DrA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260128162702-97cf629e27ec/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.4 h1:HurQEOGIEhLlPFCTR6ZDuQkybrUl2Ag2i6CdVD2rGiI= github.com/beego/beego/v2 v2.3.4/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4= diff --git a/infrastructure/nats.go b/infrastructure/nats.go index 6635da8..a30eb21 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -11,10 +11,10 @@ import ( var SearchStream = map[string]chan resources.ResourceInterface{} -func EmitNATS(message tools.PropalgationMessage) { +func EmitNATS(user string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) if message.Action == tools.PB_SEARCH { - SearchStream[message.User] = make(chan resources.ResourceInterface, 128) + SearchStream[user] = make(chan resources.ResourceInterface, 128) } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-catalog", @@ -27,15 +27,9 @@ func EmitNATS(message tools.PropalgationMessage) { func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.CATALOG_SEARCH_EVENT: func(resp tools.NATSResponse) { - p := map[string]interface{}{} - err := json.Unmarshal(resp.Payload, &p) + p, err := resources.ToResource(int(resp.Datatype), resp.Payload) if err == nil { - access := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil) - if data := access.LoadOne(fmt.Sprintf("%v", p["id"])); data.Data != nil { - access.UpdateOne(p, fmt.Sprintf("%v", p["id"])) - } else { - access.StoreOne(p) - } + SearchStream[resp.User] <- p } }, tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {