package node import ( "context" "encoding/json" "fmt" "oc-discovery/daemons/node/common" "oc-discovery/daemons/node/stream" "slices" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/tools" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) type configPayload struct { PeerID string `json:"source_peer_id"` } type executionConsidersPayload struct { PeerIDs []string `json:"peer_ids"` } func ListenNATS(n *Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { fmt.Println("PROPALGATION") if resp.FromApp == config.GetAppName() { return } p, err := oclib.GetMySelf() if err != nil || p == nil || p.PeerID != n.PeerID.String() { return } var propalgation tools.PropalgationMessage err = json.Unmarshal(resp.Payload, &propalgation) var dt *tools.DataType if propalgation.DataType > 0 { dtt := tools.DataType(propalgation.DataType) dt = &dtt } fmt.Println("PROPALGATION ACT", propalgation.DataType, propalgation.Action, propalgation.Action == tools.PB_CREATE, err) if err == nil { switch propalgation.Action { case tools.PB_ADMIRALTY_CONFIG, tools.PB_MINIO_CONFIG: var m configPayload var proto protocol.ID = stream.ProtocolAdmiraltyConfigResource if propalgation.Action == tools.PB_MINIO_CONFIG { proto = stream.ProtocolMinioConfigResource } if err := json.Unmarshal(propalgation.Payload, &m); err == nil { peers, _ := n.GetPeerRecord(context.Background(), m.PeerID) for _, p := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, p.PeerID, proto, propalgation.Payload) } } case tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE: if slices.Contains([]tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE}, resp.Datatype) { m := map[string]interface{}{} if err := json.Unmarshal(propalgation.Payload, &m); err == nil { if m["peer_id"] != nil { n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolCreateResource, propalgation.Payload) } } } else { fmt.Println(n.StreamService.ToPartnerPublishEvent( context.Background(), propalgation.Action, dt, resp.User, resp.Groups, propalgation.Payload, )) } case tools.PB_CONSIDERS: switch resp.Datatype { case tools.BOOKING, tools.PURCHASE_RESOURCE, tools.WORKFLOW_EXECUTION: var m executionConsidersPayload if err := json.Unmarshal(propalgation.Payload, &m); err == nil { for _, p := range m.PeerIDs { peers, _ := n.GetPeerRecord(context.Background(), p) for _, pp := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, pp.PeerID, stream.ProtocolConsidersResource, propalgation.Payload) } } } default: // minio / admiralty config considers — route back to OriginID. var m struct { OriginID string `json:"origin_id"` } if err := json.Unmarshal(propalgation.Payload, &m); err == nil && m.OriginID != "" { peers, _ := n.GetPeerRecord(context.Background(), m.OriginID) for _, p := range peers { n.StreamService.PublishCommon(nil, resp.User, resp.Groups, p.PeerID, stream.ProtocolConsidersResource, propalgation.Payload) } } } case tools.PB_PLANNER: m := map[string]interface{}{} if err := json.Unmarshal(propalgation.Payload, &m); err == nil { b := []byte{} if len(m) > 1 { b = propalgation.Payload } if m["peer_id"] == nil { // send to every active stream n.StreamService.Mu.Lock() if n.StreamService.Streams[stream.ProtocolSendPlanner] != nil { for pid := range n.StreamService.Streams[stream.ProtocolSendPlanner] { // send Planner can be long lived - it's a conn n.StreamService.PublishCommon(nil, resp.User, resp.Groups, pid.String(), stream.ProtocolSendPlanner, b) } } n.StreamService.Mu.Unlock() } else { n.StreamService.PublishCommon(nil, resp.User, resp.Groups, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b) } } case tools.PB_CLOSE_PLANNER: m := map[string]interface{}{} if err := json.Unmarshal(resp.Payload, &m); err == nil { n.StreamService.Mu.Lock() if pid, err := pp.Decode(fmt.Sprintf("%v", m["peer_id"])); err == nil { if n.StreamService.Streams[stream.ProtocolSendPlanner] != nil && n.StreamService.Streams[stream.ProtocolSendPlanner][pid] != nil { n.StreamService.Streams[stream.ProtocolSendPlanner][pid].Stream.Close() delete(n.StreamService.Streams[stream.ProtocolSendPlanner], pid) } } n.StreamService.Mu.Unlock() } case tools.PB_CLOSE_SEARCH: if propalgation.DataType == int(tools.PEER) { n.peerSearches.Cancel(resp.User) } else { n.StreamService.ResourceSearches.Cancel(resp.User) } case tools.PB_SEARCH: if propalgation.DataType == int(tools.PEER) { m := map[string]interface{}{} if err := json.Unmarshal(propalgation.Payload, &m); err == nil { needle := fmt.Sprintf("%v", m["search"]) userKey := resp.User go n.SearchPeerRecord(userKey, needle, func(hit common.SearchHit) { if b, err := json.Marshal(hit); err == nil { tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(tools.PEER), Method: int(tools.SEARCH_EVENT), Payload: b, }) } }) } } else { m := map[string]interface{}{} if err := json.Unmarshal(propalgation.Payload, &m); err == nil { n.PubSubService.SearchPublishEvent( context.Background(), dt, fmt.Sprintf("%v", m["type"]), resp.User, resp.Groups, fmt.Sprintf("%v", m["search"]), ) } } } } }, }) }