128 lines
4.4 KiB
Go
128 lines
4.4 KiB
Go
|
|
package pubsub
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"oc-discovery/models"
|
||
|
|
"strings"
|
||
|
|
"sync"
|
||
|
|
|
||
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/dbs"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||
|
|
)
|
||
|
|
|
||
|
|
type PubSubService struct {
|
||
|
|
PS *pubsub.PubSub
|
||
|
|
Subscription []string
|
||
|
|
mutex sync.RWMutex
|
||
|
|
}
|
||
|
|
|
||
|
|
var pubsubSingleton *PubSubService
|
||
|
|
|
||
|
|
func Init(ctx context.Context, ps *pubsub.PubSub) {
|
||
|
|
pubsubSingleton = &PubSubService{
|
||
|
|
PS: ps,
|
||
|
|
Subscription: []string{},
|
||
|
|
}
|
||
|
|
pubsubSingleton.initSubscribeEvents(ctx)
|
||
|
|
}
|
||
|
|
|
||
|
|
func GetPubSubService() *PubSubService {
|
||
|
|
return pubsubSingleton
|
||
|
|
}
|
||
|
|
|
||
|
|
func (ps *PubSubService) getTopicName(topicName string) tools.PubSubAction {
|
||
|
|
ns := strings.Split(topicName, ".")
|
||
|
|
if len(ns) > 0 {
|
||
|
|
return tools.GetActionString(ns[0])
|
||
|
|
}
|
||
|
|
return tools.NONE
|
||
|
|
}
|
||
|
|
|
||
|
|
func (abs *PubSubService) retrieveResponse(ctx context.Context, p *peer.Peer, event models.Event) error {
|
||
|
|
res, err := resources.ToResource(int(event.DataType), event.Payload)
|
||
|
|
if err != nil || res == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
b, err := json.Marshal(res.Serialize(res))
|
||
|
|
tools.NewNATSCaller().SetNATSPub(tools.CATALOG_SEARCH_EVENT, tools.NATSResponse{
|
||
|
|
FromApp: "oc-discovery",
|
||
|
|
Datatype: tools.DataType(event.DataType),
|
||
|
|
Method: int(tools.CATALOG_SEARCH_EVENT),
|
||
|
|
Payload: b,
|
||
|
|
})
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (abs *PubSubService) sendResponse(ctx context.Context, p *peer.Peer, event models.Event) error {
|
||
|
|
dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)}
|
||
|
|
if event.DataType == -1 { // expect all resources
|
||
|
|
dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
|
||
|
|
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)}
|
||
|
|
}
|
||
|
|
var m map[string]string
|
||
|
|
err := json.Unmarshal(event.Payload, &m)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
for _, dt := range dts {
|
||
|
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil)
|
||
|
|
peerID := p.GetID()
|
||
|
|
searched := access.Search(abs.filterPeer(peerID, m["search"]), "", false)
|
||
|
|
for _, ss := range searched.Data {
|
||
|
|
if j, err := json.Marshal(ss); err == nil {
|
||
|
|
if event.DataType != -1 {
|
||
|
|
ndt := tools.DataType(dt.EnumIndex())
|
||
|
|
abs.publishEvent(ctx, &ndt, tools.PB_SEARCH_RESPONSE, event.User, peerID, j, true)
|
||
|
|
} else {
|
||
|
|
abs.publishEvent(ctx, nil, tools.PB_SEARCH_RESPONSE, event.User, peerID, j, true)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (abs *PubSubService) filterPeer(peerID string, search string) *dbs.Filters {
|
||
|
|
id, err := oclib.GetMySelf()
|
||
|
|
if err != nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
filter := map[string][]dbs.Filter{
|
||
|
|
"creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource...
|
||
|
|
"": {{Operator: dbs.OR.String(), Value: &dbs.Filters{
|
||
|
|
Or: map[string][]dbs.Filter{
|
||
|
|
"abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
|
||
|
|
"abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances
|
||
|
|
And: map[string][]dbs.Filter{
|
||
|
|
"resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{
|
||
|
|
And: map[string][]dbs.Filter{
|
||
|
|
"resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}},
|
||
|
|
},
|
||
|
|
}}},
|
||
|
|
},
|
||
|
|
}}},
|
||
|
|
},
|
||
|
|
}}},
|
||
|
|
}
|
||
|
|
if search != "" {
|
||
|
|
filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{
|
||
|
|
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
|
||
|
|
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
||
|
|
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
|
||
|
|
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
|
||
|
|
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
|
||
|
|
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
||
|
|
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
|
||
|
|
},
|
||
|
|
}}}
|
||
|
|
}
|
||
|
|
return &dbs.Filters{
|
||
|
|
And: filter,
|
||
|
|
}
|
||
|
|
}
|