package infrastructure import ( "context" "encoding/json" "errors" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/tools" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/rs/zerolog" ) type PubSubAction int const ( SEARCH PubSubAction = iota SEARCH_RESPONSE CREATE UPDATE DELETE NONE ) func GetActionString(ss string) PubSubAction { switch ss { case "search": return SEARCH case "create": return CREATE case "update": return UPDATE case "delete": return DELETE default: return NONE } } var path = []string{"search", "create", "update", "delete"} func (m PubSubAction) String() string { return strings.ToUpper(path[m]) } type PubSubService struct { PS *pubsub.PubSub Subscription []string mutex sync.RWMutex SearchStream map[string]chan resources.ResourceInterface } var Singleton *PubSubService type Event struct { Type string `json:"type"` From string `json:"from"` // peerID User string DataType int64 `json:"datatype"` Timestamp int64 `json:"ts"` Payload []byte `json:"payload"` Signature []byte `json:"sig"` } func (e *Event) rawEvent() *Event { return &Event{ Type: e.Type, From: e.From, User: e.User, DataType: e.DataType, Timestamp: e.Timestamp, Payload: e.Payload, } } func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt Event) error { action := ps.getTopicName(topicName) if err := ps.handleEventFromPartner(topicName, action, evt); err != nil { return err } if err := ps.eventSearch(ctx, evt, action); err != nil { return err } return nil } func (ps *PubSubService) getTopicName( topicName string) PubSubAction { if !strings.Contains(topicName, "catalog.") { return NONE } n := strings.ReplaceAll(topicName, "catalog.", "") ns := strings.Split(n, ".") if len(ns) > 0 { return GetActionString(ns[0]) } return NONE } func (ps *PubSubService) eventSearch( // only : on partner followings. 3 canals for every partner. ctx context.Context, evt Event, action PubSubAction, ) error { var data oclib.LibData logger := oclib.GetLogger() switch action { case SEARCH_RESPONSE: access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil) peers := access.Search(nil, evt.From, false) if len(peers.Data) > 0 { if err := ps.retrieveResponse(ctx, peers, evt, logger); err != nil { return err } } else { access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil) tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{ "peer_id": evt.From, // peerside }) time.Sleep(30 * time.Second) peers = access.Search(nil, evt.From, false) if len(peers.Data) > 0 { // if found... ok... if not found ignore if err := ps.retrieveResponse(ctx, peers, evt, logger); err != nil { return err } } } case SEARCH: // when someone ask for search. access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil) peers := access.Search(nil, evt.From, false) if len(peers.Data) > 0 { if err := ps.sendResponse(ctx, peers, evt); err != nil { return err } } else { access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil) tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{ "peer_id": evt.From, // peerside }) time.Sleep(30 * time.Second) peers = access.Search(nil, evt.From, false) if len(peers.Data) > 0 { // if found... ok... if not found ignore if err := ps.sendResponse(ctx, peers, evt); err != nil { return err } } } default: return nil } if data.Err != "" { return errors.New(data.Err) } return nil } // TODO i should KNOW WHO IS ASKING !!! func (abs *PubSubService) retrieveResponse(ctx context.Context, peers oclib.LibDataShallow, event Event, logger zerolog.Logger) error { if err := abs.verifyPeer(peers, event); err != nil { return err } res, err := abs.toResource(int(event.DataType), event.Payload) if err != nil || res == nil { return nil } access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), "", "", []string{}, nil) if data := access.LoadOne(res.GetID()); data.Data != nil && data.Err == "" { if newData := access.UpdateOne(res.Serialize(res), res.GetID()); newData.Err != "" { return errors.New(newData.Err) } } else { if newData := access.StoreOne(res.Serialize(res)); newData.Err != "" { return errors.New(newData.Err) } } if abs.SearchStream[event.User] == nil { ctx.Done() return errors.New("no stream opened for user " + event.User) } select { case abs.SearchStream[event.User] <- res: case <-ctx.Done(): return errors.New("client too slow") } return nil } func (abs *PubSubService) sendResponse(ctx context.Context, peers oclib.LibDataShallow, event Event) error { if err := abs.verifyPeer(peers, event); err != nil { return err } dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)} if event.DataType == -1 { // expect all resources dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE), oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)} } var m map[string]string err := json.Unmarshal(event.Payload, &m) if err != nil { return err } for _, dt := range dts { access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), "", "", []string{}, nil) peerID := peers.Data[0].GetID() searched := access.Search(abs.FilterPeer(peerID, m["search"]), "", false) for _, ss := range searched.Data { if j, err := json.Marshal(ss); err == nil { if event.DataType != -1 { ndt := tools.DataType(dt.EnumIndex()) abs.searchResponsePublishEvent(ctx, &ndt, event.User, peerID, j) } else { abs.searchResponsePublishEvent(ctx, nil, event.User, peerID, j) } } } } return nil } func (abs *PubSubService) FilterPeer(peerID string, search string) *dbs.Filters { id, err := oclib.GetMySelf() if err != nil { return nil } filter := map[string][]dbs.Filter{ "creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource... "": {{Operator: dbs.OR.String(), Value: &dbs.Filters{ Or: map[string][]dbs.Filter{ "abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public "abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances And: map[string][]dbs.Filter{ "resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ And: map[string][]dbs.Filter{ "resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}}, }, }}}, }, }}}, }, }}}, } if search != "" { filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{ Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, }, }}} } return &dbs.Filters{ And: filter, } } func (ps *PubSubService) handleEventFromPartner( topicName string, action PubSubAction, evt Event) error { if action == CREATE || action == UPDATE || action == DELETE { return ps.eventFromPartner(evt, action) } return nil } func (ps *PubSubService) eventFromPartner( // only : on partner followings. 3 canals for every partner. evt Event, action PubSubAction, ) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(evt.DataType), "", "", []string{}, nil) ressource, err := ps.toResource(int(evt.DataType), evt.Payload) if err != nil { return err } var data oclib.LibData switch action { case CREATE: data = access.StoreOne(ressource.Serialize(ressource)) case UPDATE: if data := access.LoadOne(ressource.GetID()); data.Data == nil { data = access.StoreOne(ressource.Serialize(ressource)) } else { data = access.UpdateOne(ressource.Serialize(ressource), ressource.GetID()) } case DELETE: data = access.DeleteOne(ressource.GetID()) default: return errors.New("no action authorized available : " + action.String()) } if data.Err != "" { return errors.New(data.Err) } return nil } func (ps *PubSubService) toResource( dt int, payload []byte, ) (resources.ResourceInterface, error) { switch dt { case oclib.PROCESSING_RESOURCE.EnumIndex(): var data resources.ProcessingResource if err := json.Unmarshal(payload, &data); err != nil { return nil, err } return &data, nil case oclib.WORKFLOW_RESOURCE.EnumIndex(): var data resources.WorkflowResource if err := json.Unmarshal(payload, &data); err != nil { return nil, err } return &data, nil case oclib.DATA_RESOURCE.EnumIndex(): var data resources.DataResource if err := json.Unmarshal(payload, &data); err != nil { return nil, err } return &data, nil case oclib.STORAGE_RESOURCE.EnumIndex(): var data resources.StorageResource if err := json.Unmarshal(payload, &data); err != nil { return nil, err } return &data, nil case oclib.COMPUTE_RESOURCE.EnumIndex(): var data resources.ComputeResource if err := json.Unmarshal(payload, &data); err != nil { return nil, err } return &data, nil } return nil, errors.New("can't found any data resources matching") }