Files
oc-catalog/infrastructure/pubsub.go
2026-01-27 15:39:53 +01:00

347 lines
10 KiB
Go

package infrastructure
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/rs/zerolog"
)
type PubSubAction int
const (
SEARCH PubSubAction = iota
SEARCH_RESPONSE
CREATE
UPDATE
DELETE
NONE
)
func GetActionString(ss string) PubSubAction {
switch ss {
case "search":
return SEARCH
case "create":
return CREATE
case "update":
return UPDATE
case "delete":
return DELETE
default:
return NONE
}
}
var path = []string{"search", "create", "update", "delete"}
func (m PubSubAction) String() string {
return strings.ToUpper(path[m])
}
type PubSubService struct {
PS *pubsub.PubSub
Subscription []string
mutex sync.RWMutex
SearchStream map[string]chan resources.ResourceInterface
}
var Singleton *PubSubService
type Event struct {
Type string `json:"type"`
From string `json:"from"` // peerID
User string
DataType int64 `json:"datatype"`
Timestamp int64 `json:"ts"`
Payload []byte `json:"payload"`
Signature []byte `json:"sig"`
}
func (e *Event) rawEvent() *Event {
return &Event{
Type: e.Type,
From: e.From,
User: e.User,
DataType: e.DataType,
Timestamp: e.Timestamp,
Payload: e.Payload,
}
}
func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt Event) error {
action := ps.getTopicName(topicName)
if err := ps.handleEventFromPartner(topicName, action, evt); err != nil {
return err
}
if err := ps.eventSearch(ctx, evt, action); err != nil {
return err
}
return nil
}
func (ps *PubSubService) getTopicName(
topicName string) PubSubAction {
if !strings.Contains(topicName, "catalog.") {
return NONE
}
n := strings.ReplaceAll(topicName, "catalog.", "")
ns := strings.Split(n, ".")
if len(ns) > 0 {
return GetActionString(ns[0])
}
return NONE
}
func (ps *PubSubService) eventSearch( // only : on partner followings. 3 canals for every partner.
ctx context.Context,
evt Event,
action PubSubAction,
) error {
var data oclib.LibData
logger := oclib.GetLogger()
switch action {
case SEARCH_RESPONSE:
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
peers := access.Search(nil, evt.From, false)
if len(peers.Data) > 0 {
if err := ps.retrieveResponse(ctx, peers, evt, logger); err != nil {
return err
}
} else {
access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{
"peer_id": evt.From, // peerside
})
time.Sleep(30 * time.Second)
peers = access.Search(nil, evt.From, false)
if len(peers.Data) > 0 { // if found... ok... if not found ignore
if err := ps.retrieveResponse(ctx, peers, evt, logger); err != nil {
return err
}
}
}
case SEARCH: // when someone ask for search.
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
peers := access.Search(nil, evt.From, false)
if len(peers.Data) > 0 {
if err := ps.sendResponse(ctx, peers, evt); err != nil {
return err
}
} else {
access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{
"peer_id": evt.From, // peerside
})
time.Sleep(30 * time.Second)
peers = access.Search(nil, evt.From, false)
if len(peers.Data) > 0 { // if found... ok... if not found ignore
if err := ps.sendResponse(ctx, peers, evt); err != nil {
return err
}
}
}
default:
return nil
}
if data.Err != "" {
return errors.New(data.Err)
}
return nil
}
// TODO i should KNOW WHO IS ASKING !!!
func (abs *PubSubService) retrieveResponse(ctx context.Context, peers oclib.LibDataShallow, event Event, logger zerolog.Logger) error {
if err := abs.verifyPeer(peers, event); err != nil {
return err
}
res, err := abs.toResource(int(event.DataType), event.Payload)
if err != nil || res == nil {
return nil
}
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), "", "", []string{}, nil)
if data := access.LoadOne(res.GetID()); data.Data != nil && data.Err == "" {
if newData := access.UpdateOne(res.Serialize(res), res.GetID()); newData.Err != "" {
return errors.New(newData.Err)
}
} else {
if newData := access.StoreOne(res.Serialize(res)); newData.Err != "" {
return errors.New(newData.Err)
}
}
if abs.SearchStream[event.User] == nil {
ctx.Done()
return errors.New("no stream opened for user " + event.User)
}
select {
case abs.SearchStream[event.User] <- res:
case <-ctx.Done():
return errors.New("client too slow")
}
return nil
}
func (abs *PubSubService) sendResponse(ctx context.Context, peers oclib.LibDataShallow, event Event) error {
if err := abs.verifyPeer(peers, event); err != nil {
return err
}
dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)}
if event.DataType == -1 { // expect all resources
dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)}
}
var m map[string]string
err := json.Unmarshal(event.Payload, &m)
if err != nil {
return err
}
for _, dt := range dts {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), "", "", []string{}, nil)
peerID := peers.Data[0].GetID()
searched := access.Search(abs.FilterPeer(peerID, m["search"]), "", false)
for _, ss := range searched.Data {
if j, err := json.Marshal(ss); err == nil {
if event.DataType != -1 {
ndt := tools.DataType(dt.EnumIndex())
abs.searchResponsePublishEvent(ctx, &ndt, event.User, peerID, j)
} else {
abs.searchResponsePublishEvent(ctx, nil, event.User, peerID, j)
}
}
}
}
return nil
}
func (abs *PubSubService) FilterPeer(peerID string, search string) *dbs.Filters {
id, err := oclib.GetMySelf()
if err != nil {
return nil
}
filter := map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource...
"": {{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
"abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances
And: map[string][]dbs.Filter{
"resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{
And: map[string][]dbs.Filter{
"resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}},
},
}}},
},
}}},
},
}}},
}
if search != "" {
filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
},
}}}
}
return &dbs.Filters{
And: filter,
}
}
func (ps *PubSubService) handleEventFromPartner(
topicName string,
action PubSubAction,
evt Event) error {
if action == CREATE || action == UPDATE || action == DELETE {
return ps.eventFromPartner(evt, action)
}
return nil
}
func (ps *PubSubService) eventFromPartner( // only : on partner followings. 3 canals for every partner.
evt Event,
action PubSubAction,
) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(evt.DataType), "", "", []string{}, nil)
ressource, err := ps.toResource(int(evt.DataType), evt.Payload)
if err != nil {
return err
}
var data oclib.LibData
switch action {
case CREATE:
data = access.StoreOne(ressource.Serialize(ressource))
case UPDATE:
if data := access.LoadOne(ressource.GetID()); data.Data == nil {
data = access.StoreOne(ressource.Serialize(ressource))
} else {
data = access.UpdateOne(ressource.Serialize(ressource), ressource.GetID())
}
case DELETE:
data = access.DeleteOne(ressource.GetID())
default:
return errors.New("no action authorized available : " + action.String())
}
if data.Err != "" {
return errors.New(data.Err)
}
return nil
}
func (ps *PubSubService) toResource(
dt int,
payload []byte,
) (resources.ResourceInterface, error) {
switch dt {
case oclib.PROCESSING_RESOURCE.EnumIndex():
var data resources.ProcessingResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.WORKFLOW_RESOURCE.EnumIndex():
var data resources.WorkflowResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.DATA_RESOURCE.EnumIndex():
var data resources.DataResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.STORAGE_RESOURCE.EnumIndex():
var data resources.StorageResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.COMPUTE_RESOURCE.EnumIndex():
var data resources.ComputeResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
}
return nil, errors.New("can't found any data resources matching")
}