Discovery Nats Related

This commit is contained in:
mr
2026-01-28 17:23:55 +01:00
parent a9e737fb4f
commit e650ad479d
15 changed files with 221 additions and 1197 deletions

62
infrastructure/nats.go Normal file
View File

@@ -0,0 +1,62 @@
package infrastructure
import (
"encoding/json"
"fmt"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
var SearchStream = map[string]chan resources.ResourceInterface{}
func EmitNATS(message tools.PropalgationMessage) {
b, _ := json.Marshal(message)
if message.Action == tools.PB_SEARCH {
SearchStream[message.User] = make(chan resources.ResourceInterface, 128)
}
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-catalog",
Datatype: -1,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
tools.CATALOG_SEARCH_EVENT: func(resp tools.NATSResponse) {
p := map[string]interface{}{}
err := json.Unmarshal(resp.Payload, &p)
if err == nil {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil)
if data := access.LoadOne(fmt.Sprintf("%v", p["id"])); data.Data != nil {
access.UpdateOne(p, fmt.Sprintf("%v", p["id"]))
} else {
access.StoreOne(p)
}
}
},
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
p := map[string]interface{}{}
err := json.Unmarshal(resp.Payload, &p)
if err == nil {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil)
if data := access.LoadOne(fmt.Sprintf("%v", p["id"])); data.Data != nil {
access.UpdateOne(p, fmt.Sprintf("%v", p["id"]))
} else {
access.StoreOne(p)
}
}
},
tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) {
p := map[string]interface{}{}
access := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil)
err := json.Unmarshal(resp.Payload, &p)
if err == nil {
access.DeleteOne(fmt.Sprintf("%v", p["id"]))
}
},
})
}

View File

@@ -1,245 +0,0 @@
package infrastructure
import (
"context"
"encoding/json"
"errors"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
func (ps *PubSubService) searchResponsePublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
peerID string,
payload []byte,
) error {
return ps.publishEvent(ctx, dt, SEARCH_RESPONSE, user, peerID, payload, true)
}
func (ps *PubSubService) SearchPublishEvent(
ctx context.Context,
dt *tools.DataType,
typ string,
user string,
search string,
) error {
switch typ {
case "partner":
ps.searchPartnersPublishEvent(
ctx, dt, user, search,
)
case "all":
b, err := json.Marshal(map[string]string{
"search": search,
})
if err != nil {
return err
}
ps.searchPublishEvent(
ctx, dt, user, "", b,
)
case "known":
ps.searchKnownPublishEvent(
ctx, dt, user, search,
)
default:
return errors.New("no type of research found")
}
return nil
}
func (ps *PubSubService) searchPartnersPublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
search string,
) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
f := &dbs.Filters{
And: map[string][]dbs.Filter{ // search by name if no filters are provided
"state": {{Operator: dbs.EQUAL.String(), Value: peer.ONLINE.EnumIndex()}},
"relation": {{Operator: dbs.EQUAL.String(), Value: peer.PARTNER.EnumIndex()}},
},
}
if search != "" {
f.Or = map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
}
}
b, err := json.Marshal(map[string]string{
"search": search,
})
if err != nil {
return err
}
peersKnown := access.Search(f, "", false)
for _, known := range peersKnown.Data {
if err := ps.searchPublishEvent(ctx, dt, user, known.GetID(), b); err != nil {
return err
}
}
return nil
}
func (ps *PubSubService) searchKnownPublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
search string,
) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
f := &dbs.Filters{
And: map[string][]dbs.Filter{ // search by name if no filters are provided
"state": {{Operator: dbs.EQUAL.String(), Value: peer.ONLINE.EnumIndex()}},
"relation": {{Operator: dbs.NOT.String(), Value: &dbs.Filters{
And: map[string][]dbs.Filter{
"relation": {{Operator: dbs.EQUAL.String(), Value: peer.BLACKLIST.EnumIndex()}},
},
}}},
},
}
if search != "" {
f.Or = map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
}
}
b, err := json.Marshal(map[string]string{
"search": search,
})
if err != nil {
return err
}
peersKnown := access.Search(f, "", false)
for _, known := range peersKnown.Data {
if err := ps.searchPublishEvent(ctx, dt, user, known.GetID(), b); err != nil {
return err
}
}
return nil
}
func (ps *PubSubService) searchPublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
peerID string,
payload []byte,
) error {
id, err := oclib.GenerateNodeID()
if err != nil {
return err
}
ps.SearchStream[user] = make(chan resources.ResourceInterface, 128) // set up the searchStream
if err := ps.subscribeEvents(ctx, dt, SEARCH_RESPONSE, id, 60); err != nil { // TODO Catpure Event !
return err
}
return ps.publishEvent(ctx, dt, SEARCH, user, peerID, payload, false)
}
func (ps *PubSubService) CreatePublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
payload []byte,
) error {
id, err := oclib.GenerateNodeID()
if err != nil {
return err
}
return ps.publishEvent(ctx, dt, CREATE, user, id, payload, false)
}
func (ps *PubSubService) UpdatePublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
payload []byte,
) error {
id, err := oclib.GenerateNodeID()
if err != nil {
return err
}
return ps.publishEvent(ctx, dt, UPDATE, user, id, payload, false)
}
func (ps *PubSubService) DeletePublishEvent(
ctx context.Context,
dt *tools.DataType,
user string,
payload []byte,
) error {
id, err := oclib.GenerateNodeID()
if err != nil {
return err
}
return ps.publishEvent(ctx, dt, DELETE, user, id, payload, false)
}
func (ps *PubSubService) publishEvent(
ctx context.Context,
dt *tools.DataType,
action PubSubAction,
user string,
peerID string,
payload []byte,
chanNamedByDt bool,
) error {
name := "oc-catalog." + action.String() + "#" + peerID
if chanNamedByDt && dt != nil { // if a datatype is precised then : app.action.datatype#peerID
name = "oc-catalog." + action.String() + "." + (*dt).String() + "#" + peerID
}
from, err := oclib.GenerateNodeID()
if err != nil {
return err
}
evt := Event{
Type: name,
From: from,
User: user,
Timestamp: time.Now().Unix(),
Payload: payload,
}
if dt != nil {
evt.DataType = int64(dt.EnumIndex())
} else {
evt.DataType = -1
}
body, _ := json.Marshal(evt)
priv, err := LoadKeyFromFile(false)
if err != nil {
return err
}
sig, _ := priv.Sign(body)
evt.Signature = sig
msg, _ := json.Marshal(evt)
topic, err := ps.PS.Join(name)
if err != nil {
return err
}
return topic.Publish(ctx, msg)
}
// TODO REVIEW PUBLISHING + ADD SEARCH ON PUBLIC : YES
// TODO : Search should verify DataType

View File

@@ -1,346 +0,0 @@
package infrastructure
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/rs/zerolog"
)
type PubSubAction int
const (
SEARCH PubSubAction = iota
SEARCH_RESPONSE
CREATE
UPDATE
DELETE
NONE
)
func GetActionString(ss string) PubSubAction {
switch ss {
case "search":
return SEARCH
case "create":
return CREATE
case "update":
return UPDATE
case "delete":
return DELETE
default:
return NONE
}
}
var path = []string{"search", "create", "update", "delete"}
func (m PubSubAction) String() string {
return strings.ToUpper(path[m])
}
type PubSubService struct {
PS *pubsub.PubSub
Subscription []string
mutex sync.RWMutex
SearchStream map[string]chan resources.ResourceInterface
}
var Singleton *PubSubService
type Event struct {
Type string `json:"type"`
From string `json:"from"` // peerID
User string
DataType int64 `json:"datatype"`
Timestamp int64 `json:"ts"`
Payload []byte `json:"payload"`
Signature []byte `json:"sig"`
}
func (e *Event) rawEvent() *Event {
return &Event{
Type: e.Type,
From: e.From,
User: e.User,
DataType: e.DataType,
Timestamp: e.Timestamp,
Payload: e.Payload,
}
}
func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt Event) error {
action := ps.getTopicName(topicName)
if err := ps.handleEventFromPartner(topicName, action, evt); err != nil {
return err
}
if err := ps.eventSearch(ctx, evt, action); err != nil {
return err
}
return nil
}
func (ps *PubSubService) getTopicName(
topicName string) PubSubAction {
if !strings.Contains(topicName, "catalog.") {
return NONE
}
n := strings.ReplaceAll(topicName, "catalog.", "")
ns := strings.Split(n, ".")
if len(ns) > 0 {
return GetActionString(ns[0])
}
return NONE
}
func (ps *PubSubService) eventSearch( // only : on partner followings. 3 canals for every partner.
ctx context.Context,
evt Event,
action PubSubAction,
) error {
var data oclib.LibData
logger := oclib.GetLogger()
switch action {
case SEARCH_RESPONSE:
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
peers := access.Search(nil, evt.From, false)
if len(peers.Data) > 0 {
if err := ps.retrieveResponse(ctx, peers, evt, logger); err != nil {
return err
}
} else {
access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{
"peer_id": evt.From, // peerside
})
time.Sleep(30 * time.Second)
peers = access.Search(nil, evt.From, false)
if len(peers.Data) > 0 { // if found... ok... if not found ignore
if err := ps.retrieveResponse(ctx, peers, evt, logger); err != nil {
return err
}
}
}
case SEARCH: // when someone ask for search.
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
peers := access.Search(nil, evt.From, false)
if len(peers.Data) > 0 {
if err := ps.sendResponse(ctx, peers, evt); err != nil {
return err
}
} else {
access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{
"peer_id": evt.From, // peerside
})
time.Sleep(30 * time.Second)
peers = access.Search(nil, evt.From, false)
if len(peers.Data) > 0 { // if found... ok... if not found ignore
if err := ps.sendResponse(ctx, peers, evt); err != nil {
return err
}
}
}
default:
return nil
}
if data.Err != "" {
return errors.New(data.Err)
}
return nil
}
// TODO i should KNOW WHO IS ASKING !!!
func (abs *PubSubService) retrieveResponse(ctx context.Context, peers oclib.LibDataShallow, event Event, logger zerolog.Logger) error {
if err := abs.verifyPeer(peers, event); err != nil {
return err
}
res, err := abs.toResource(int(event.DataType), event.Payload)
if err != nil || res == nil {
return nil
}
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), "", "", []string{}, nil)
if data := access.LoadOne(res.GetID()); data.Data != nil && data.Err == "" {
if newData := access.UpdateOne(res.Serialize(res), res.GetID()); newData.Err != "" {
return errors.New(newData.Err)
}
} else {
if newData := access.StoreOne(res.Serialize(res)); newData.Err != "" {
return errors.New(newData.Err)
}
}
if abs.SearchStream[event.User] == nil {
ctx.Done()
return errors.New("no stream opened for user " + event.User)
}
select {
case abs.SearchStream[event.User] <- res:
case <-ctx.Done():
return errors.New("client too slow")
}
return nil
}
func (abs *PubSubService) sendResponse(ctx context.Context, peers oclib.LibDataShallow, event Event) error {
if err := abs.verifyPeer(peers, event); err != nil {
return err
}
dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)}
if event.DataType == -1 { // expect all resources
dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)}
}
var m map[string]string
err := json.Unmarshal(event.Payload, &m)
if err != nil {
return err
}
for _, dt := range dts {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), "", "", []string{}, nil)
peerID := peers.Data[0].GetID()
searched := access.Search(abs.FilterPeer(peerID, m["search"]), "", false)
for _, ss := range searched.Data {
if j, err := json.Marshal(ss); err == nil {
if event.DataType != -1 {
ndt := tools.DataType(dt.EnumIndex())
abs.searchResponsePublishEvent(ctx, &ndt, event.User, peerID, j)
} else {
abs.searchResponsePublishEvent(ctx, nil, event.User, peerID, j)
}
}
}
}
return nil
}
func (abs *PubSubService) FilterPeer(peerID string, search string) *dbs.Filters {
id, err := oclib.GetMySelf()
if err != nil {
return nil
}
filter := map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource...
"": {{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
"abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances
And: map[string][]dbs.Filter{
"resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{
And: map[string][]dbs.Filter{
"resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}},
},
}}},
},
}}},
},
}}},
}
if search != "" {
filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
},
}}}
}
return &dbs.Filters{
And: filter,
}
}
func (ps *PubSubService) handleEventFromPartner(
topicName string,
action PubSubAction,
evt Event) error {
if action == CREATE || action == UPDATE || action == DELETE {
return ps.eventFromPartner(evt, action)
}
return nil
}
func (ps *PubSubService) eventFromPartner( // only : on partner followings. 3 canals for every partner.
evt Event,
action PubSubAction,
) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(evt.DataType), "", "", []string{}, nil)
ressource, err := ps.toResource(int(evt.DataType), evt.Payload)
if err != nil {
return err
}
var data oclib.LibData
switch action {
case CREATE:
data = access.StoreOne(ressource.Serialize(ressource))
case UPDATE:
if data := access.LoadOne(ressource.GetID()); data.Data == nil {
data = access.StoreOne(ressource.Serialize(ressource))
} else {
data = access.UpdateOne(ressource.Serialize(ressource), ressource.GetID())
}
case DELETE:
data = access.DeleteOne(ressource.GetID())
default:
return errors.New("no action authorized available : " + action.String())
}
if data.Err != "" {
return errors.New(data.Err)
}
return nil
}
func (ps *PubSubService) toResource(
dt int,
payload []byte,
) (resources.ResourceInterface, error) {
switch dt {
case oclib.PROCESSING_RESOURCE.EnumIndex():
var data resources.ProcessingResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.WORKFLOW_RESOURCE.EnumIndex():
var data resources.WorkflowResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.DATA_RESOURCE.EnumIndex():
var data resources.DataResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.STORAGE_RESOURCE.EnumIndex():
var data resources.StorageResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
case oclib.COMPUTE_RESOURCE.EnumIndex():
var data resources.ComputeResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
}
return nil, errors.New("can't found any data resources matching")
}

View File

@@ -1,208 +0,0 @@
package infrastructure
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"slices"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
)
func (ps *PubSubService) InitSubscribeEvents(ctx context.Context) error {
ourPeerID, err := oclib.GenerateNodeID()
if err != nil {
return err
}
// subscribe :
if err := ps.subscribeEvents(ctx, nil, SEARCH, "", -1); err != nil { // we subscribe at our proprer deductible search adresse.
return err
}
if err := ps.subscribeEvents(ctx, nil, SEARCH, ourPeerID, -1); err != nil { // we subscribe at our proprer deductible search adresse.
return err
}
if err := ps.PartnersSubscribeEvents(ctx); err != nil {
return nil
}
return nil
}
func (ps *PubSubService) SearchSubscribeEvents(
ctx context.Context,
priv crypto.PrivKey,
dt tools.DataType,
id string,
payload []byte,
) error {
return ps.subscribeEvents(ctx, &dt, CREATE, id, 10)
}
func (ps *PubSubService) PartnersSubscribeEvents(
ctx context.Context,
) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
peers := access.Search(nil, fmt.Sprintf("%v", peer.PARTNER.EnumIndex()), false)
for _, p := range peers.Data {
loadedPeer := access.LoadOne(p.GetID())
rp := loadedPeer.ToPeer()
if rp == nil {
continue
}
if err := ps.PartnerSubscribeEvents(ctx, rp.PeerID); err != nil {
return err
}
}
return nil
}
func (ps *PubSubService) PartnerSubscribeEvents(
ctx context.Context,
peerID string,
) error {
if peerID == "" {
return errors.New("should discover a particular peer")
}
if err := ps.subscribeEvents(ctx, nil, CREATE, peerID, -1); err != nil {
return err
}
if err := ps.subscribeEvents(ctx, nil, UPDATE, peerID, -1); err != nil {
return err
}
if err := ps.subscribeEvents(ctx, nil, DELETE, peerID, -1); err != nil {
return err
}
return nil
}
// generic function to subscribe to DHT flow of event
func (ps *PubSubService) subscribeEvents(
ctx context.Context,
dt *tools.DataType,
action PubSubAction,
peerID string,
timeout int,
) error {
// define a name app.action#peerID
name := "oc-catalog." + action.String() + "#" + peerID
if dt != nil { // if a datatype is precised then : app.action.datatype#peerID
name = "oc-catalog." + action.String() + "." + (*dt).String() + "#" + peerID
}
topic, err := ps.PS.Join(name) // find out the topic
if err != nil {
return err
}
sub, err := topic.Subscribe() // then subscribe to it
if err != nil {
return err
}
ps.mutex.Lock() // add safely in cache your subscription.
ps.Subscription = append(ps.Subscription, name)
ps.mutex.Unlock()
// launch loop waiting for results.
go ps.waitResults(ctx, sub, name, timeout)
return nil
}
func (ps *PubSubService) waitResults(ctx context.Context, sub *pubsub.Subscription, topicName string, timeout int) {
logger := oclib.GetLogger()
defer ctx.Done()
for {
ps.mutex.Lock() // check safely if cache is actually notified subscribed to topic
if !slices.Contains(ps.Subscription, topicName) { // if not kill the loop.
break
}
ps.mutex.Unlock()
// if still subscribed -> wait for new message
var cancel context.CancelFunc
if timeout != -1 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
}
msg, err := sub.Next(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// timeout hit, no message before deadline kill subsciption.
ps.mutex.Lock()
subs := []string{}
for _, ss := range ps.Subscription {
if ss != topicName {
subs = append(subs, ss)
}
}
ps.Subscription = subs
ps.mutex.Unlock()
return
}
continue
}
var evt Event
if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event
continue
}
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
peers := access.Search(nil, evt.From, false)
if len(peers.Data) > 0 { // then we check if the peer is friendly or not : Partner or None...
if err := ps.processEventPeerKnown(ctx, peers, evt, topicName); err != nil {
logger.Err(err)
}
} else {
tools.NewNATSCaller().SetNATSPub(tools.PEER_DISCOVERY, map[string]string{
"peer_id": evt.From,
})
time.Sleep(30 * time.Second)
peers = access.Search(nil, evt.From, false)
if len(peers.Data) > 0 { // if found... ok... if not found ignore
if err := ps.processEventPeerKnown(ctx, peers, evt, topicName); err != nil {
logger.Err(err)
}
}
}
}
}
func (ps *PubSubService) processEventPeerKnown(ctx context.Context, peers oclib.LibDataShallow, event Event, topicName string) error {
if err := ps.verifyPeer(peers, event); err != nil {
return err
}
ps.handleEvent(ctx, topicName, event)
return nil
}
func (pc *PubSubService) verifyPeer(peers oclib.LibDataShallow, event Event) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), "", "", []string{}, nil)
loadedPeer := access.LoadOne(peers.Data[0].GetID())
rp := loadedPeer.ToPeer()
if rp == nil || rp.Relation == peer.BLACKLIST { // if peer is blacklisted... quit...
return errors.New("peer is blacklisted")
}
pubKey, err := PubKeyFromString(rp.PublicKey) // extract pubkey from pubkey str
if err != nil {
return errors.New("pubkey is malformed")
}
data, _ := json.Marshal(event.rawEvent()) // extract byte from raw event excluding signature.
if ok, _ := pubKey.Verify(data, event.Signature); !ok { // then verify if pubkey sign this message...
return errors.New("check signature failed")
}
return nil
}
func PubKeyFromString(s string) (crypto.PubKey, error) {
data, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return nil, err
}
return crypto.UnmarshalPublicKey(data)
}

View File

@@ -1,43 +0,0 @@
package infrastructure
import (
"bytes"
"oc-catalog/conf"
"os"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/pnet"
)
func LoadKeyFromFile(isPublic bool) (crypto.PrivKey, error) {
path := conf.GetConfig().PrivateKeyPath
if isPublic {
path = conf.GetConfig().PublicKeyPath
}
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
// Try to unmarshal as libp2p private key (supports ed25519, rsa, etc.)
priv, err := crypto.UnmarshalPrivateKey(data)
if err != nil {
return nil, err
}
return priv, nil
}
func LoadPSKFromFile() (pnet.PSK, error) {
path := conf.GetConfig().PSKPath
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
// Try to unmarshal as libp2p private key (supports ed25519, rsa, etc.)
psk, err := pnet.DecodeV1PSK(bytes.NewReader(data))
if err != nil {
return nil, err
}
return psk, nil
}