Daemons Search
This commit is contained in:
139
daemons/pubsub/subscribe.go
Normal file
139
daemons/pubsub/subscribe.go
Normal file
@@ -0,0 +1,139 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"oc-discovery/daemons"
|
||||
"oc-discovery/daemons/dht"
|
||||
"oc-discovery/models"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
)
|
||||
|
||||
func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error {
|
||||
ourPeerID, err := oclib.GenerateNodeID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, "", -1); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, ourPeerID, -1); err != nil { // we subscribe at our proprer deductible search adresse.
|
||||
return err
|
||||
}
|
||||
if err := ps.initPartnersSubscribeEvents(ctx); err != nil {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSubService) initPartnersSubscribeEvents(ctx context.Context) error {
|
||||
// search all your partners : we check in base for this because we keep actively peer state if partners
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
peers := access.Search(nil, fmt.Sprintf("%v", peer.PARTNER.EnumIndex()), false)
|
||||
for _, p := range peers.Data {
|
||||
if err := ps.PartnerSubscribeEvents(ctx, p.(*peer.Peer).PeerID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSubService) PartnerSubscribeEvents(ctx context.Context, myPartnerPeerID string) error {
|
||||
if myPartnerPeerID == "" {
|
||||
return errors.New("should discover a particular partner peer")
|
||||
}
|
||||
for _, action := range []tools.PubSubAction{tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE} {
|
||||
if err := ps.subscribeEvents(ctx, nil, action, myPartnerPeerID, -1); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// generic function to subscribe to DHT flow of event
|
||||
func (ps *PubSubService) subscribeEvents(
|
||||
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string, timeout int,
|
||||
) error {
|
||||
// define a name app.action#peerID
|
||||
name := action.String() + "#" + peerID
|
||||
if dt != nil { // if a datatype is precised then : app.action.datatype#peerID
|
||||
name = action.String() + "." + (*dt).String() + "#" + peerID
|
||||
}
|
||||
topic, err := ps.PS.Join(name) // find out the topic
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sub, err := topic.Subscribe() // then subscribe to it
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ps.mutex.Lock() // add safely in cache your subscription.
|
||||
ps.Subscription = append(ps.Subscription, name)
|
||||
ps.mutex.Unlock()
|
||||
|
||||
// launch loop waiting for results.
|
||||
go ps.waitResults(ctx, sub, name, timeout)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSubService) waitResults(ctx context.Context, sub *pubsub.Subscription, topicName string, timeout int) {
|
||||
logger := oclib.GetLogger()
|
||||
defer ctx.Done()
|
||||
for {
|
||||
ps.mutex.Lock() // check safely if cache is actually notified subscribed to topic
|
||||
if !slices.Contains(ps.Subscription, topicName) { // if not kill the loop.
|
||||
break
|
||||
}
|
||||
ps.mutex.Unlock()
|
||||
// if still subscribed -> wait for new message
|
||||
var cancel context.CancelFunc
|
||||
if timeout != -1 {
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
}
|
||||
msg, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
// timeout hit, no message before deadline kill subsciption.
|
||||
ps.mutex.Lock()
|
||||
subs := []string{}
|
||||
for _, ss := range ps.Subscription {
|
||||
if ss != topicName {
|
||||
subs = append(subs, ss)
|
||||
}
|
||||
}
|
||||
ps.Subscription = subs
|
||||
ps.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
var evt models.Event
|
||||
if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event
|
||||
continue
|
||||
}
|
||||
if p, err := dht.GetDHTService().GetPeer(ctx, evt.From); err == nil {
|
||||
if err := ps.processEventPeerKnown(ctx, p, evt, topicName); err != nil {
|
||||
logger.Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PubSubService) processEventPeerKnown(
|
||||
ctx context.Context, p *peer.Peer, event models.Event, topicName string) error {
|
||||
if err := daemons.VerifyPeer([]*peer.Peer{p}, event); err != nil {
|
||||
return err
|
||||
}
|
||||
return ps.handleEvent(ctx, topicName, event)
|
||||
}
|
||||
Reference in New Issue
Block a user