From 562d86125e57e02380f0685eba4a5783fc5afe0b Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 30 Jan 2026 16:57:36 +0100 Subject: [PATCH] FULL OC-DISCOVERY LOGIC --- conf/config.go | 15 +- daemons/dht/dht.go | 429 -------------------- daemons/{pubsub => }/nats.go | 10 +- daemons/node/common/common_pubsub.go | 148 +++++++ daemons/node/common/common_stream.go | 385 ++++++++++++++++++ daemons/{utils.go => node/common/crypto.go} | 2 +- daemons/node/common/interface.go | 11 + daemons/node/indexer/handler.go | 189 +++++++++ daemons/node/indexer/service.go | 49 +++ daemons/node/node.go | 235 +++++++++++ daemons/node/pubsub/handler.go | 42 ++ daemons/node/pubsub/publish.go | 82 ++++ daemons/node/pubsub/service.go | 45 ++ daemons/{ => node}/pubsub/subscribe.go | 51 +-- daemons/node/stream/handler.go | 123 ++++++ daemons/node/stream/publish.go | 150 +++++++ daemons/node/stream/service.go | 228 +++++++++++ daemons/pubsub/handler.go | 89 ---- daemons/pubsub/publish.go | 205 ---------- daemons/pubsub/service.go | 127 ------ go.mod | 2 +- go.sum | 4 + main.go | 49 ++- models/event.go | 28 +- 24 files changed, 1769 insertions(+), 929 deletions(-) delete mode 100644 daemons/dht/dht.go rename daemons/{pubsub => }/nats.go (85%) create mode 100644 daemons/node/common/common_pubsub.go create mode 100644 daemons/node/common/common_stream.go rename daemons/{utils.go => node/common/crypto.go} (99%) create mode 100644 daemons/node/common/interface.go create mode 100644 daemons/node/indexer/handler.go create mode 100644 daemons/node/indexer/service.go create mode 100644 daemons/node/node.go create mode 100644 daemons/node/pubsub/handler.go create mode 100644 daemons/node/pubsub/publish.go create mode 100644 daemons/node/pubsub/service.go rename daemons/{ => node}/pubsub/subscribe.go (59%) create mode 100644 daemons/node/stream/handler.go create mode 100644 daemons/node/stream/publish.go create mode 100644 daemons/node/stream/service.go delete mode 100644 daemons/pubsub/handler.go delete mode 100644 daemons/pubsub/publish.go delete mode 100644 daemons/pubsub/service.go diff --git a/conf/config.go b/conf/config.go index eb67188..a5c22f4 100644 --- a/conf/config.go +++ b/conf/config.go @@ -3,14 +3,15 @@ package conf import "sync" type Config struct { - Name string - Hostname string - PSKPath string - PublicKeyPath string - PrivateKeyPath string - DHTEndpointPort int64 + Name string + Hostname string + PSKPath string + PublicKeyPath string + PrivateKeyPath string + NodeEndpointPort int64 + IndexerAddresses string - BootstrapAddresses string + NodeMode string } var instance *Config diff --git a/daemons/dht/dht.go b/daemons/dht/dht.go deleted file mode 100644 index 8c075ed..0000000 --- a/daemons/dht/dht.go +++ /dev/null @@ -1,429 +0,0 @@ -package dht - -import ( - "context" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "oc-discovery/conf" - "oc-discovery/daemons" - "slices" - "strings" - "sync" - "time" - - oclib "cloud.o-forge.io/core/oc-lib" - pp "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/models/utils" - "cloud.o-forge.io/core/oc-lib/tools" - "github.com/google/uuid" - "github.com/libp2p/go-libp2p" - dht "github.com/libp2p/go-libp2p-kad-dht" - kad_dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" -) - -type DHTRecord struct { - Name string `json:"name"` - State int `json:"state"` - DID string `json:"did"` - PeerID string `json:"peer_id"` - PubKey []byte `json:"pub_key"` - URL string `json:"url"` - NATSUrl string `json:"nats_url"` - Wallet string `json:"wallet"` - Signature []byte `json:"signature"` - ExpiryDate time.Time `json:"expiry_date"` -} - -type DHTService struct { - Key string - Host host.Host - DHT *dht.IpfsDHT - Cache []string - mutex sync.RWMutex -} - -// TODO kick connection to base... and send on NATS boy -var dhtSingletonService *DHTService - -func GetDHTService() *DHTService { - return dhtSingletonService -} - -func Init(ctx context.Context) (*DHTService, error) { - service := &DHTService{} - priv, err := daemons.LoadKeyFromFile(false) - if err != nil { - return nil, err - } - psk, err := daemons.LoadPSKFromFile() - if err != nil { - return nil, err - } - h, err := libp2p.New( - libp2p.PrivateNetwork(psk), - libp2p.Identity(priv), - libp2p.ListenAddrStrings( - fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().DHTEndpointPort), - ), - ) - if err != nil { - return nil, err - } - service.Host = h - service.DHT, err = kad_dht.New(ctx, h, kad_dht.MaxRecordAge(24*time.Hour)) // every day DHT will purge expired data... if not used. - if err != nil { - return nil, err - } - err = service.DHT.Bootstrap(ctx) - if err != nil { - return nil, err - } - - for _, address := range strings.Split(conf.GetConfig().BootstrapAddresses, ",") { - pi, err := peer.AddrInfoFromString(address) - if err != nil { - return nil, err - } - logger := oclib.GetLogger() - if err := h.Connect(ctx, *pi); err != nil { - logger.Err(fmt.Errorf("Failed to connect to MAIN bootstrap peer %s: %s", pi.ID, err)) - } else { - logger.Info().Msg(fmt.Sprintf("Connected to MAIN bootstrap peer %s", pi.ID)) - } - } - - dhtSingletonService = service - if daemons.VerifyPubWithPriv() { - if _, err := dhtSingletonService.ClaimName(context.Background(), - conf.GetConfig().Name, - conf.GetConfig().Hostname, false); err == nil { - go service.Heartbeat(ctx, 2*time.Minute) - go service.RefreshKeys(ctx, 30*time.Minute) - } - } - - return service, nil -} - -func (d *DHTService) Heartbeat(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - go func() { - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - dhtSingletonService.ClaimName(context.Background(), conf.GetConfig().Name, conf.GetConfig().Hostname, true) - } - } - }() -} - -func (d *DHTService) RefreshKeys(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - go func() { - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - s := []string{} - d.mutex.Lock() - s = append(s, d.Cache...) - d.mutex.Unlock() - for _, key := range s { - _, _ = d.GetValue(ctx, key) - } - } - } - }() -} - -func (d *DHTService) PutValue( - ctx context.Context, - key string, - value []byte, -) error { - err := d.DHT.PutValue(ctx, key, value) - if err != nil { - return err - } - d.mutex.Lock() - if !slices.Contains(d.Cache, key) { - d.Cache = append(d.Cache, key) - } - d.mutex.Unlock() - return nil -} - -func (d *DHTService) GetValue( - ctx context.Context, - key string, -) (*DHTRecord, error) { - dht, err := d.DHT.GetValue(ctx, key) - if err != nil { - cache := []string{} - d.mutex.Lock() - for _, c := range d.Cache { - if c != key { - cache = append(cache, c) - } - } - d.Cache = cache - d.mutex.Unlock() - return nil, err - } - d.mutex.Lock() - if !slices.Contains(d.Cache, key) { - d.Cache = append(d.Cache, key) - } - d.mutex.Unlock() - var data DHTRecord - json.Unmarshal(dht, &data) - - peerID, err := oclib.GenerateNodeID() - if err != nil { - return nil, err - } - p := &pp.Peer{ - AbstractObject: utils.AbstractObject{ - UUID: uuid.New().String(), - Name: data.Name, - }, - State: pp.ONLINE, - Relation: pp.SELF, - PeerID: peerID, - PublicKey: string(data.PubKey), - Url: data.URL, - NATSUrl: oclib.GetConfig().NATSUrl, - WalletAddress: data.Wallet, - } - b, err := json.Marshal(p) - if err != nil { - return nil, err - } - tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.PEER, - Method: int(tools.CREATE_PEER), - Payload: b, - }) - - /*if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); - err == nil && len(founded) > 0 && founded[0].(*pp.Peer).Relation != pp.BLACKLIST { - f.(*pp.Peer).State = pp.ONLINE - f.(*pp.Peer).NATSUrl = p.NATSUrl - f.(*pp.Peer).Url = p.Url - f.(*pp.Peer).PeerID = p.PeerID - f.(*pp.Peer).Relation = p.Relation - f.(*pp.Peer).WalletAddress = p.WalletAddress - access.UpdateOne(f, f.GetID()) - }*/ - - return &data, err -} - -func (d *DHTService) generateKey() (string, error) { - s, err := oclib.GenerateNodeID() - if err != nil { - return s, err - } - return "/opencloud/peer/" + s, nil -} - -// Create your peer. -func (d *DHTService) ClaimName( - ctx context.Context, - name string, - endPoint string, - avoidVerification bool, -) (*pp.Peer, error) { - if endPoint == "" { - return nil, errors.New("no endpoint found for peer" + name) - } - - peerID, err := oclib.GenerateNodeID() - if err != nil { - return nil, err - } - - pub := d.Host.Peerstore().PubKey(d.Host.ID()) - pubBytes, _ := pub.Raw() - - now := time.Now() - expiry := now.Add(150 * time.Second) - - rec := DHTRecord{ - Name: name, - PeerID: peerID, - PubKey: pubBytes, - } - - payload, _ := json.Marshal(rec) - sig, _ := daemons.Sign(d.Host.Peerstore().PrivKey(d.Host.ID()), payload) - rec.Signature = sig - - rec.URL = endPoint - rec.NATSUrl = oclib.GetConfig().NATSUrl - rec.State = pp.ONLINE.EnumIndex() - rec.ExpiryDate = expiry - - data, _ := json.Marshal(rec) - - key, err := d.generateKey() - if err != nil { - return nil, err - } - - // retrieve your key name in standard - if !avoidVerification { - old, err := d.GetValue(ctx, key) - if err == nil { - if old.PeerID != peerID { // check if someone claims your name before - return nil, errors.New("name already claimed by another peer") - } - if now.After(old.ExpiryDate) { - payload, _ := json.Marshal(rec) - d.PutValue(ctx, key, payload) - } - } - } - - if err := d.PutValue(ctx, key, data); err != nil { - return nil, err - } - - pubStr := base64.StdEncoding.EncodeToString(pubBytes) - d.Key = key - if err != nil { - return nil, err - } - p := &pp.Peer{ - AbstractObject: utils.AbstractObject{ - UUID: uuid.New().String(), - Name: name, - }, - State: pp.ONLINE, - Relation: pp.SELF, - PeerID: peerID, - PublicKey: pubStr, - Url: endPoint, - NATSUrl: oclib.GetConfig().NATSUrl, - WalletAddress: "my-wallet", - } - - b, err := json.Marshal(p) - if err != nil { - return nil, err - } - tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.PEER, - Method: int(tools.CREATE_PEER), - Payload: b, - }) - return p, nil -} - -// Discover a specific Peer -func (d *DHTService) DiscoverPeers(ctx context.Context, name string) ([]*pp.Peer, error) { - peers := []*pp.Peer{} - key, err := d.generateKey() - if err != nil { - return nil, err - } - datas, _ := d.DHT.SearchValue(ctx, key) - for data := range datas { - var dht *DHTRecord - if err := json.Unmarshal(data, dht); err != nil { - return peers, err - } - if p, err := d.treatPeer(ctx, key, dht); err == nil { - peers = append(peers, p) - } - } - return peers, nil -} - -func (d *DHTService) GetPeer(ctx context.Context, name string) (*pp.Peer, error) { - key, err := d.generateKey() - if err != nil { - return nil, err - } - data, err := d.GetValue(ctx, key) - if err != nil { - return nil, errors.New("no DHT peer not found") - } - return d.treatPeer(ctx, key, data) -} - -func (d *DHTService) treatPeer(ctx context.Context, key string, rec *DHTRecord) (*pp.Peer, error) { - pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey) - if err != nil { - return nil, err - } - - now := time.Now() - dht := DHTRecord{ - Name: rec.Name, - PeerID: rec.PeerID, - PubKey: rec.PubKey, - } - payload, _ := json.Marshal(dht) - - if ok, _ := daemons.Verify(pubKey, payload, rec.Signature); !ok { - return nil, errors.New("invalid signature") - } - pubBytes, _ := pubKey.Raw() - pubStr := base64.StdEncoding.EncodeToString(pubBytes) - - rel := pp.NONE - if d.Key == key { - rel = pp.SELF - } - - p := &pp.Peer{ - AbstractObject: utils.AbstractObject{ - UUID: uuid.New().String(), - Name: rec.Name, - }, - State: pp.ONLINE, - Relation: rel, - PeerID: rec.PeerID, - PublicKey: pubStr, - Url: rec.URL, - NATSUrl: rec.NATSUrl, - WalletAddress: rec.Wallet, - } - if now.After(rec.ExpiryDate) { // is expired - rec.State = pp.OFFLINE.EnumIndex() - p.State = pp.OFFLINE - payload, _ := json.Marshal(rec) - d.PutValue(ctx, key, payload) - - b, err := json.Marshal(p) - if err != nil { - return nil, err - } - tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.PEER, - Method: int(tools.CREATE_PEER), - Payload: b, - }) - return nil, errors.New("peer " + key + " is expired") - } - if p.State == pp.OFFLINE { - return nil, errors.New("peer " + key + " is offline") - } - return p, nil -} - -// TODO : HEARTBEAT diff --git a/daemons/pubsub/nats.go b/daemons/nats.go similarity index 85% rename from daemons/pubsub/nats.go rename to daemons/nats.go index a18b9de..d1b929e 100644 --- a/daemons/pubsub/nats.go +++ b/daemons/nats.go @@ -1,14 +1,15 @@ -package pubsub +package daemons import ( "context" "encoding/json" "fmt" + "oc-discovery/daemons/node" "cloud.o-forge.io/core/oc-lib/tools" ) -func ListenNATS() { +func ListenNATS(n node.Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { var propalgation tools.PropalgationMessage @@ -18,13 +19,12 @@ func ListenNATS() { dtt := tools.DataType(propalgation.DataType) dt = &dtt } - if err == nil { switch propalgation.Action { case tools.PB_CREATE: case tools.PB_UPDATE: case tools.PB_DELETE: - GetPubSubService().ToPartnerPublishEvent( + n.StreamService.ToPartnerPublishEvent( context.Background(), propalgation.Action, dt, resp.User, @@ -33,7 +33,7 @@ func ListenNATS() { case tools.PB_SEARCH: m := map[string]interface{}{} json.Unmarshal(propalgation.Payload, &m) - GetPubSubService().SearchPublishEvent( + n.PubSubService.SearchPublishEvent( context.Background(), dt, fmt.Sprintf("%v", m["type"]), diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go new file mode 100644 index 0000000..5587052 --- /dev/null +++ b/daemons/node/common/common_pubsub.go @@ -0,0 +1,148 @@ +package common + +import ( + "context" + "encoding/json" + "errors" + "sync" + "time" + + "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/tools" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + pp "github.com/libp2p/go-libp2p/core/peer" +) + +type Event struct { + Type string `json:"type"` + From string `json:"from"` // peerID + + User string + + DataType int64 `json:"datatype"` + Timestamp int64 `json:"ts"` + Payload []byte `json:"payload"` + Signature []byte `json:"sig"` +} + +func NewEvent(name string, from string, dt *tools.DataType, user string, payload []byte) *Event { + priv, err := LoadKeyFromFile(false) // your node private key + if err != nil { + return nil + } + evt := &Event{ + Type: name, + From: from, + User: user, + Timestamp: time.Now().Unix(), + Payload: payload, + } + if dt != nil { + evt.DataType = int64(dt.EnumIndex()) + } else { + evt.DataType = -1 + } + + body, _ := json.Marshal(evt) + sig, _ := priv.Sign(body) + evt.Signature = sig + return evt +} + +func (e *Event) RawEvent() *Event { + return &Event{ + Type: e.Type, + From: e.From, + User: e.User, + DataType: e.DataType, + Timestamp: e.Timestamp, + Payload: e.Payload, + } +} + +func (e *Event) toRawByte() ([]byte, error) { + return json.Marshal(e.RawEvent()) +} + +func (event *Event) Verify(p *peer.Peer) error { + if p == nil { + return errors.New("no peer found") + } + if p.Relation == peer.BLACKLIST { // if peer is blacklisted... quit... + return errors.New("peer is blacklisted") + } + pubKey, err := PubKeyFromString(p.PublicKey) // extract pubkey from pubkey str + if err != nil { + return errors.New("pubkey is malformed") + } + data, err := event.toRawByte() + if err != nil { + return err + } // extract byte from raw event excluding signature. + if ok, _ := pubKey.Verify(data, event.Signature); !ok { // then verify if pubkey sign this message... + return errors.New("check signature failed") + } + return nil +} + +type TopicNodeActivityPub struct { + DID string + PeerID string + NodeActivity peer.PeerState +} + +type LongLivedPubSubService struct { + Host host.Host + LongLivedPubSubs map[string]*pubsub.Topic + PubsubMu sync.RWMutex +} + +func NewLongLivedPubSubService(h host.Host) *LongLivedPubSubService { + return &LongLivedPubSubService{ + Host: h, + LongLivedPubSubs: map[string]*pubsub.Topic{}, + } +} + +func (s *LongLivedPubSubService) processEvent( + ctx context.Context, + p *peer.Peer, + event *Event, + topicName string, handler func(context.Context, string, *Event) error) error { + if err := event.Verify(p); err != nil { + return err + } + return handler(ctx, topicName, event) +} + +const TopicPubSubNodeActivity = "oc-node-activity" +const TopicPubSubSearch = "oc-node-search" + +func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub) error { + ps.RegisterTopicValidator(TopicPubSubNodeActivity, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { + return true + }) + if topic, err := ps.Join(TopicPubSubNodeActivity); err != nil { + return err + } else { + s.PubsubMu.Lock() + defer s.PubsubMu.Unlock() + s.LongLivedPubSubs[TopicPubSubNodeActivity] = topic + } + return nil +} + +func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub) error { + ps.RegisterTopicValidator(TopicPubSubSearch, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { + return true + }) + if topic, err := ps.Join(TopicPubSubSearch); err != nil { + return err + } else { + s.PubsubMu.Lock() + defer s.PubsubMu.Unlock() + s.LongLivedPubSubs[TopicPubSubSearch] = topic + } + return nil +} diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go new file mode 100644 index 0000000..3e642fe --- /dev/null +++ b/daemons/node/common/common_stream.go @@ -0,0 +1,385 @@ +package common + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "oc-discovery/conf" + "strings" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + peer "cloud.o-forge.io/core/oc-lib/models/peer" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + pp "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +type LongLivedStreamRecordedService[T interface{}] struct { + *LongLivedPubSubService + StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] + StreamMU sync.RWMutex + maxNodesConn int + isBidirectionnal bool +} + +func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int, isBidirectionnal bool) *LongLivedStreamRecordedService[T] { + service := &LongLivedStreamRecordedService[T]{ + LongLivedPubSubService: NewLongLivedPubSubService(h), + StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{}, + maxNodesConn: maxNodesConn, + isBidirectionnal: isBidirectionnal, + } + go service.StartGC(30 * time.Second) + // Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned + go service.Snapshot(1 * time.Hour) + return service +} + +func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) { + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for range t.C { + ix.gc() + } + }() +} + +func (ix *LongLivedStreamRecordedService[T]) gc() { + ix.StreamMU.Lock() + defer ix.StreamMU.Unlock() + now := time.Now() + streams := ix.StreamRecords[ProtocolHeartbeat] + if streams == nil { + ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} + return + } + for pid, rec := range streams { + if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) { + for _, sstreams := range ix.StreamRecords { + if sstreams[pid] != nil { + sstreams[pid].Stream.Close() + delete(sstreams, pid) + } + } + ix.PubsubMu.Lock() + if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil { + if b, err := json.Marshal(TopicNodeActivityPub{ + DID: rec.HeartbeatStream.DID, + PeerID: pid.String(), + NodeActivity: peer.OFFLINE, + }); err == nil { + ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b) + } + } + ix.PubsubMu.Unlock() + } + } + +} + +func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) { + go func() { + logger := oclib.GetLogger() + t := time.NewTicker(interval) + defer t.Stop() + for range t.C { + infos := ix.snapshot() + for _, inf := range infos { + logger.Info().Msg(" -> " + inf.DID) + } + } + }() +} + +// -------- Snapshot / Query -------- +func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] { + ix.StreamMU.Lock() + defer ix.StreamMU.Unlock() + + out := make([]*StreamRecord[T], 0, len(ix.StreamRecords)) + for _, streams := range ix.StreamRecords { + for _, stream := range streams { + out = append(out, stream) + } + } + return out +} + +func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) { + streams := ix.StreamRecords[ProtocolHeartbeat] + pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn) + if err != nil { + return + } + ix.StreamMU.Lock() + defer ix.StreamMU.Unlock() + if streams == nil { + ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} + return + } + // if record already seen update last seen + if rec, ok := streams[*pid]; ok { + rec.DID = hb.DID + rec.Stream = s + rec.HeartbeatStream = hb.Stream + rec.LastSeen = time.Unix(hb.Timestamp, 0) + } + + // si je l'handle et que je ne suis pas dans une +} + +func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heartbeat, error) { + if len(h.Network().Peers()) >= maxNodes { + return nil, nil, fmt.Errorf("too many connections, try another indexer") + } + defer s.Close() + + var hb Heartbeat + if err := json.NewDecoder(s).Decode(&hb); err != nil { + return nil, nil, err + } + pid, err := pp.Decode(hb.PeerID) + hb.Stream.Stream = s // here is the long-lived bidirectionnal heart bit. + return &pid, &hb, err +} + +type StreamRecord[T interface{}] struct { + DID string + HeartbeatStream *Stream + Stream network.Stream + Record T + LastSeen time.Time // to check expiry +} + +type Stream struct { + DID string `json:"did"` + Stream network.Stream + Expiry time.Time `json:"expiry"` +} + +func NewStream[T interface{}](s network.Stream, did string, record T) *Stream { + return &Stream{ + DID: did, + Stream: s, + Expiry: time.Now().UTC().Add(2 * time.Minute), + } +} + +type ProtocolStream map[protocol.ID]map[pp.ID]*Stream + +func (ps ProtocolStream) Get(protocol protocol.ID) map[pp.ID]*Stream { + if ps[protocol] == nil { + ps[protocol] = map[pp.ID]*Stream{} + } + + return ps[protocol] +} + +func (ps ProtocolStream) Add(protocol protocol.ID, peerID *pp.ID, s *Stream) error { + if ps[protocol] == nil { + ps[protocol] = map[pp.ID]*Stream{} + } + if peerID != nil { + if s != nil { + ps[protocol][*peerID] = s + } else { + return errors.New("unable to add stream : stream missing") + } + } + return nil +} + +func (ps ProtocolStream) Delete(protocol protocol.ID, peerID *pp.ID) { + if streams, ok := ps[protocol]; ok { + if peerID != nil && streams[*peerID] != nil { + streams[*peerID].Stream.Close() + delete(streams, *peerID) + } else { + for _, s := range ps { + for _, v := range s { + v.Stream.Close() + } + } + delete(ps, protocol) + } + } +} + +const ( + ProtocolPublish = "/opencloud/record/publish/1.0" + ProtocolGet = "/opencloud/record/get/1.0" +) + +var StaticIndexers []*pp.AddrInfo = []*pp.AddrInfo{} +var StreamIndexers ProtocolStream = ProtocolStream{} + +func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) { + logger := oclib.GetLogger() + ctx := context.Background() + addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",") + + if len(addresses) > maxIndexer { + addresses = addresses[0:maxIndexer] + } + + for _, indexerAddr := range addresses { + ad, err := pp.AddrInfoFromString(indexerAddr) + if err != nil { + logger.Err(err) + continue + } + if h.Network().Connectedness(ad.ID) != network.Connected { + if err := h.Connect(ctx, *ad); err != nil { + logger.Err(err) + continue + } + } + StaticIndexers = append(StaticIndexers, ad) + // make a privilege streams with indexer. + for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} { + AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, nil) + } + } + if len(StaticIndexers) == 0 { + panic("can't run a node with no indexers") + } + + if len(StaticIndexers) < minIndexer { + // TODO : ask for unknown indexer. + } + SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20) // your indexer is just like a node for the next indexer. +} + +func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, onStreamCreated *func(network.Stream)) ProtocolStream { + if onStreamCreated == nil { + f := func(s network.Stream) { + protoS[proto][id] = &Stream{ + Stream: s, + Expiry: time.Now().Add(2 * time.Minute), + } + } + onStreamCreated = &f + } + f := *onStreamCreated + if mypid > id { + if ctx == nil { + c := context.Background() + ctx = &c + } + if protoS[proto] == nil { + protoS[proto] = map[pp.ID]*Stream{} + } + + if protoS[proto][id] != nil { + protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) + } else { + s, err := h.NewStream(*ctx, id, proto) + if err != nil { + panic(err.Error()) + } + f(s) + } + } else { + h.SetStreamHandler(proto, f) + } + + return protoS +} + +type Heartbeat struct { + Stream *Stream `json:"stream"` + DID string `json:"did"` + PeerID string `json:"peer_id"` + Timestamp int64 `json:"timestamp"` +} + +type HeartbeatInfo []struct { + Info []byte `json:"info"` +} + +const ProtocolHeartbeat = "/opencloud/heartbeat/1.0" + +func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) { + peerID, err := oclib.GenerateNodeID() + if err == nil { + panic("can't heartbeat daemon failed to start") + } + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-t.C: + hb := Heartbeat{ + DID: peerID, + PeerID: h.ID().String(), + Timestamp: time.Now().Unix(), + } + for _, ix := range peers { + _ = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second) + } + case <-ctx.Done(): + return + } + } + }() +} + +func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) error { + streams := ps.Get(proto) + if len(streams) == 0 { + return errors.New("no stream for protocol heartbeat founded") + } + pss, exists := streams[p.ID] + + ctxTTL, _ := context.WithTimeout(ctx, 3*interval) + // Connect si nécessaire + if h.Network().Connectedness(p.ID) != network.Connected { + _ = h.Connect(ctxTTL, *p) + exists = false // on devra recréer le stream + } + + // Crée le stream si inexistant ou fermé + if !exists || pss.Stream == nil { + s, err := h.NewStream(ctx, p.ID, proto) + if err != nil { + return err + } + pss = &Stream{ + Stream: s, + Expiry: time.Now().UTC().Add(2 * time.Minute), + } + streams[p.ID] = pss + } + + // Envoie le heartbeat + ss := json.NewEncoder(pss.Stream) + err := ss.Encode(&hb) + if err != nil { + pss.Stream.Close() + pss.Stream = nil // recréera au prochain tick + return err + } + pss.Expiry = time.Now().UTC().Add(2 * time.Minute) + return nil +} + +/* +func SearchPeer(search string) ([]*peer.Peer, error) { + ps := []*peer.Peer{} + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + peers := access.Search(nil, search, false) + if len(peers.Data) == 0 { + return ps, errors.New("no self available") + } + for _, p := range peers.Data { + ps = append(ps, p.(*peer.Peer)) + } + return ps, nil +} +*/ diff --git a/daemons/utils.go b/daemons/node/common/crypto.go similarity index 99% rename from daemons/utils.go rename to daemons/node/common/crypto.go index fe9fd0a..5fbeea1 100644 --- a/daemons/utils.go +++ b/daemons/node/common/crypto.go @@ -1,4 +1,4 @@ -package daemons +package common import ( "bytes" diff --git a/daemons/node/common/interface.go b/daemons/node/common/interface.go new file mode 100644 index 0000000..d197b8f --- /dev/null +++ b/daemons/node/common/interface.go @@ -0,0 +1,11 @@ +package common + +import ( + "context" + + "cloud.o-forge.io/core/oc-lib/models/peer" +) + +type DiscoveryPeer interface { + GetPeerRecord(ctx context.Context, key string) (*peer.Peer, error) +} diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go new file mode 100644 index 0000000..30387f0 --- /dev/null +++ b/daemons/node/indexer/handler.go @@ -0,0 +1,189 @@ +package indexer + +import ( + "encoding/base64" + "encoding/json" + "errors" + "oc-discovery/daemons/node/common" + "time" + + pp "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +type PeerRecord struct { + Name string `json:"name"` + DID string `json:"did"` // real PEER ID + PeerID string `json:"peer_id"` + PubKey []byte `json:"pub_key"` + APIUrl string `json:"api_url"` + StreamAddress string `json:"stream_address"` + NATSAddress string `json:"nats_address"` + WalletAddress string `json:"wallet_address"` + Signature []byte `json:"signature"` + ExpiryDate time.Time `json:"expiry_date"` +} + +func (p *PeerRecord) Sign() error { + priv, err := common.LoadKeyFromFile(false) + if err != nil { + return err + } + dht := PeerRecord{ + Name: p.Name, + DID: p.DID, + PubKey: p.PubKey, + ExpiryDate: p.ExpiryDate, + } + payload, _ := json.Marshal(dht) + b, err := common.Sign(priv, payload) + p.Signature = b + return err +} + +func (p *PeerRecord) Verify() (crypto.PubKey, error) { + pubKey, err := crypto.UnmarshalPublicKey(p.PubKey) // retrieve pub key in message + if err != nil { + return pubKey, err + } + dht := PeerRecord{ + Name: p.Name, + DID: p.DID, + PubKey: p.PubKey, + ExpiryDate: p.ExpiryDate, + } + payload, _ := json.Marshal(dht) + + if ok, _ := common.Verify(pubKey, payload, p.Signature); !ok { // verify minimal message was sign per pubKey + return pubKey, errors.New("invalid signature") + } + return pubKey, nil +} + +func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKey) (bool, *pp.Peer, error) { + pubBytes, _ := pubKey.Raw() + + rel := pp.NONE + if ourkey == key { // at this point is PeerID is same as our... we are... thats our peer INFO + rel = pp.SELF + } + + p := &pp.Peer{ + AbstractObject: utils.AbstractObject{ + UUID: pr.DID, + Name: pr.Name, + }, + State: pp.ONLINE, + Relation: rel, // VERIFY.... it crush nothing + PeerID: pr.PeerID, + PublicKey: base64.StdEncoding.EncodeToString(pubBytes), + APIUrl: pr.APIUrl, + StreamAddress: pr.StreamAddress, + NATSAddress: pr.NATSAddress, + WalletAddress: pr.WalletAddress, + } + if time.Now().After(pr.ExpiryDate) { // is expired + p.State = pp.OFFLINE // then is considers OFFLINE + } + b, err := json.Marshal(p) + if err != nil { + return pp.SELF == p.Relation, nil, err + } + tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.PEER, + Method: int(tools.CREATE_PEER), + Payload: b, + }) + if p.State == pp.OFFLINE { + return pp.SELF == p.Relation, nil, errors.New("peer " + key + " is offline") + } + return pp.SELF == p.Relation, p, nil +} + +type GetValue struct { + Key string `json:"key"` +} + +type GetResponse struct { + Found bool `json:"found"` + Record PeerRecord `json:"record,omitempty"` +} + +func (ix *IndexerService) initNodeHandler() { + ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat) + ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish) + ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet) +} + +func (ix *IndexerService) handleNodePublish(s network.Stream) { + defer s.Close() + + var rec PeerRecord + if err := json.NewDecoder(s).Decode(&rec); err != nil { + return + } + if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now()) { // already expired + return + } + pid, err := peer.Decode(rec.PeerID) + if err != nil { + return + } + + ix.StreamMU.Lock() + defer ix.StreamMU.Unlock() + + streams := ix.StreamRecords[common.ProtocolPublish] + if streams == nil { + ix.StreamRecords[common.ProtocolPublish] = map[peer.ID]*common.StreamRecord[PeerRecord]{} + return + } + + if srec, ok := streams[pid]; ok { + srec.DID = rec.DID + srec.Record = rec + srec.LastSeen = time.Now() + } else { + streams[pid] = &common.StreamRecord[PeerRecord]{ // HeartBeat wil + DID: rec.DID, + Record: rec, + LastSeen: time.Now(), + } + } +} + +func (ix *IndexerService) handleNodeGet(s network.Stream) { + defer s.Close() + + var req GetValue + if err := json.NewDecoder(s).Decode(&req); err != nil { + return + } + ix.StreamMU.Lock() + defer ix.StreamMU.Unlock() + + streams := ix.StreamRecords[common.ProtocolGet] + if streams == nil { + ix.StreamRecords[common.ProtocolGet] = map[peer.ID]*common.StreamRecord[PeerRecord]{} + return + } + + // simple lookup by PeerID (or DID) + for _, rec := range streams { + if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // OK + resp := GetResponse{ + Found: true, + Record: rec.Record, + } + _ = json.NewEncoder(s).Encode(resp) + return + } + } + // Not found + _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) +} diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go new file mode 100644 index 0000000..0dcce9e --- /dev/null +++ b/daemons/node/indexer/service.go @@ -0,0 +1,49 @@ +package indexer + +import ( + "context" + "oc-discovery/daemons/node/common" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" +) + +// Index Record is the model for the specialized registry of node connected to Indexer +type IndexerService struct { + *common.LongLivedStreamRecordedService[PeerRecord] + PS *pubsub.PubSub + isStrictIndexer bool +} + +// if a pubsub is given... indexer is also an active oc-node. If not... your a strict indexer +func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerService { + var err error + ix := &IndexerService{ + LongLivedStreamRecordedService: common.NewStreamRecordedService[PeerRecord](h, maxNode, false), + isStrictIndexer: ps == nil, + } + if ps == nil { // generate your fresh gossip for the flow of killed node... EVERYBODY should know ! + ps, err = pubsub.NewGossipSub(context.Background(), ix.Host) + if err != nil { + panic(err) // can't run your indexer without a propalgation pubsub, of state of node. + } + } + ix.PS = ps + // later TODO : all indexer laucnh a private replica of them self. DEV OPS + if ix.isStrictIndexer { + common.ConnectToIndexers(h, 0, 5, ix.Host.ID()) // TODO : make var to change how many indexers are allowed. + ix.SubscribeToNodeActivity(ix.PS) // now we subscribe to a long run topic named node-activity, to relay message. + ix.SubscribeToSearch(ix.PS) + } + ix.initNodeHandler() // then listen up on every protocol expected + return ix +} + +func (ix *IndexerService) Close() { + for _, s := range ix.StreamRecords { + for _, ss := range s { + ss.Stream.Close() + ss.HeartbeatStream.Stream.Close() + } + } +} diff --git a/daemons/node/node.go b/daemons/node/node.go new file mode 100644 index 0000000..ba912e3 --- /dev/null +++ b/daemons/node/node.go @@ -0,0 +1,235 @@ +package node + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "oc-discovery/conf" + "oc-discovery/daemons/node/common" + "oc-discovery/daemons/node/indexer" + "oc-discovery/daemons/node/pubsub" + "oc-discovery/daemons/node/stream" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/peer" + "github.com/libp2p/go-libp2p" + pubsubs "github.com/libp2p/go-libp2p-pubsub" + pp "github.com/libp2p/go-libp2p/core/peer" +) + +type Node struct { + *common.LongLivedStreamRecordedService[interface{}] // change type of stream + PS *pubsubs.PubSub + IndexerService *indexer.IndexerService + PubSubService *pubsub.PubSubService + StreamService *stream.StreamService + PeerID pp.ID +} + +func InitNode(isNode bool, isIndexer bool) (*Node, error) { + if !isNode && !isIndexer { + return nil, errors.New("wait... what ? your node need to at least something. Retry we can't be friend in that case") + } + logger := oclib.GetLogger() + logger.Info().Msg("retrieving private key...") + priv, err := common.LoadKeyFromFile(false) // your node private key + if err != nil { + return nil, err + } + logger.Info().Msg("retrieving psk file...") + psk, err := common.LoadPSKFromFile() // network common private Network. Public OC PSK is Public Network + if err != nil { + return nil, nil + } + logger.Info().Msg("open a host...") + h, err := libp2p.New( + libp2p.PrivateNetwork(psk), + libp2p.Identity(priv), + libp2p.ListenAddrStrings( + fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().NodeEndpointPort), + ), + ) + if err != nil { + return nil, errors.New("no host no node") + } + node := &Node{ + PeerID: h.ID(), + LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000, false), + } + var ps *pubsubs.PubSub + if isNode { + logger.Info().Msg("generate opencloud node...") + ps, err = pubsubs.NewGossipSub(context.Background(), node.Host) + if err != nil { + panic(err) // can't run your node without a propalgation pubsub, of state of node. + } + node.PS = ps + common.ConnectToIndexers(node.Host, 0, 5, node.PeerID) // TODO : make var to change how many indexers are allowed. + node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname) + node.SubscribeToNodeActivity(node.PS) // now we subscribe to a long run topic named node-activity, to relay message. + node.SubscribeToSearch(node.PS) + node.StartGC(30 * time.Second) + + if node.StreamService, err = stream.InitStream(context.Background(), node.Host, node.PeerID, 1000, node); err != nil { + panic(err) + } + + if node.PubSubService, err = pubsub.InitPubSub(context.Background(), node.Host, node.PS, node, node.StreamService); err != nil { + panic(err) + } + } + if isIndexer { + logger.Info().Msg("generate opencloud indexer...") + node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5) + } + return node, nil +} + +func (d *Node) Close() { + d.IndexerService.Close() + d.PubSubService.Close() + d.StreamService.Close() + d.Host.Close() +} + +func (d *Node) publishPeerRecord( + rec *indexer.PeerRecord, +) error { + priv, err := common.LoadKeyFromFile(false) // your node private key + if err != nil { + return err + } + if common.StreamIndexers[common.ProtocolPublish] == nil { + return errors.New("no protocol Publish is set up on the node") + } + for _, ad := range common.StaticIndexers { + if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { + return errors.New("no protocol Publish for peer " + ad.ID.String() + " is set up on the node") + } + stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] + base := indexer.PeerRecord{ + Name: rec.Name, + DID: rec.DID, + PubKey: rec.PubKey, + ExpiryDate: time.Now().UTC().Add(2 * time.Minute), + } + payload, _ := json.Marshal(base) + hash := sha256.Sum256(payload) + + rec.ExpiryDate = base.ExpiryDate + rec.Signature, err = priv.Sign(hash[:]) + + if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream + return err + } + } + return nil +} + +func (d *Node) GetPeerRecord( + ctx context.Context, + key string, +) (*peer.Peer, error) { + var err error + var info *indexer.PeerRecord + if common.StreamIndexers[common.ProtocolPublish] == nil { + return nil, errors.New("no protocol Publish is set up on the node") + } + for _, ad := range common.StaticIndexers { + if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { + return nil, errors.New("no protocol Publish for peer " + ad.ID.String() + " is set up on the node") + } + stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] + if err := json.NewEncoder(stream.Stream).Encode(indexer.GetValue{Key: key}); err != nil { + return nil, err + } + + var resp indexer.GetResponse + if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { + return nil, err + } + if resp.Found { + info = &resp.Record + break + } + } + var p *peer.Peer + if info != nil { + if pk, err := info.Verify(); err != nil { + return nil, err + } else if ok, p, err := info.ExtractPeer(d.PeerID.String(), key, pk); err != nil { + return nil, err + } else { + if ok { + d.publishPeerRecord(info) + } + return p, nil + } + } + return p, err +} + +func (d *Node) claimInfo( + name string, + endPoint string, // TODO : endpoint is not necesserry StreamAddress +) (*peer.Peer, error) { + if endPoint == "" { + return nil, errors.New("no endpoint found for peer") + } + + peerID, err := oclib.GenerateNodeID() + if err != nil { + return nil, err + } + + priv, err := common.LoadKeyFromFile(false) + if err != nil { + return nil, err + } + + pub, err := common.LoadKeyFromFile(true) + if err != nil { + return nil, err + } + + pubBytes, _ := pub.Raw() + + now := time.Now() + expiry := now.Add(150 * time.Second) + + rec := &indexer.PeerRecord{ + Name: name, + DID: peerID, // REAL PEER ID + PubKey: pubBytes, + } + + rec.PeerID = d.Host.ID().String() + d.PeerID = d.Host.ID() + + payload, _ := json.Marshal(rec) + hash := sha256.Sum256(payload) + + rec.Signature, err = priv.Sign(hash[:]) + if err != nil { + return nil, err + } + + rec.APIUrl = endPoint + rec.StreamAddress = "/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + rec.PeerID + rec.NATSAddress = oclib.GetConfig().NATSUrl + rec.WalletAddress = "my-wallet" + rec.ExpiryDate = expiry + + if err := d.publishPeerRecord(rec); err != nil { + return nil, err + } + if pk, err := rec.Verify(); err != nil { + return nil, err + } else { + _, p, err := rec.ExtractPeer(peerID, peerID, pk) + return p, err + } +} diff --git a/daemons/node/pubsub/handler.go b/daemons/node/pubsub/handler.go new file mode 100644 index 0000000..60a9b77 --- /dev/null +++ b/daemons/node/pubsub/handler.go @@ -0,0 +1,42 @@ +package pubsub + +import ( + "context" + "oc-discovery/daemons/node/common" + + "cloud.o-forge.io/core/oc-lib/tools" +) + +func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt *common.Event) error { + action := ps.getTopicName(topicName) + if err := ps.handleEventSearch(ctx, evt, action); err != nil { + return err + } + return nil +} + +func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 canals for every partner. + ctx context.Context, + evt *common.Event, + action tools.PubSubAction, +) error { + if !(action == tools.PB_SEARCH_RESPONSE || action == tools.PB_SEARCH) { + return nil + } + // TODO VERIFY: FROM SHOULD BE A PEER ID OR A DID + if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil { + if err := evt.Verify(p); err != nil { + return err + } + switch action { + case tools.PB_SEARCH: // when someone ask for search. + if err := ps.StreamService.SendResponse(p, evt); err != nil { + return err + } + + default: + return nil + } + } + return nil +} diff --git a/daemons/node/pubsub/publish.go b/daemons/node/pubsub/publish.go new file mode 100644 index 0000000..5945799 --- /dev/null +++ b/daemons/node/pubsub/publish.go @@ -0,0 +1,82 @@ +package pubsub + +import ( + "context" + "encoding/json" + "errors" + "oc-discovery/daemons/node/common" + "oc-discovery/models" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/tools" +) + +func (ps *PubSubService) SearchPublishEvent( + ctx context.Context, dt *tools.DataType, typ string, user string, search string) error { + switch typ { + case "known": // define Search Strategy + return ps.StreamService.SearchKnownPublishEvent(dt, user, search) //if partners focus only them*/ + case "partner": // define Search Strategy + return ps.StreamService.SearchPartnersPublishEvent(dt, user, search) //if partners focus only them*/ + case "all": // Gossip PubSub + b, err := json.Marshal(map[string]string{"search": search}) + if err != nil { + return err + } + return ps.searchPublishEvent(ctx, dt, user, b) + default: + return errors.New("no type of research found") + } +} + +func (ps *PubSubService) searchPublishEvent( + ctx context.Context, dt *tools.DataType, user string, payload []byte) error { + id, err := oclib.GenerateNodeID() + if err != nil { + return err + } + if err := ps.subscribeEvents(ctx, dt, tools.PB_SEARCH_RESPONSE, id, 60); err != nil { // TODO Catpure Event ! + return err + } + return ps.publishEvent(ctx, dt, tools.PB_SEARCH, user, "", payload, false) +} + +func (ps *PubSubService) publishEvent( + ctx context.Context, dt *tools.DataType, action tools.PubSubAction, user string, + peerID string, payload []byte, chanNamedByDt bool, +) error { + name := action.String() + "#" + peerID + if chanNamedByDt && dt != nil { // if a datatype is precised then : app.action.datatype#peerID + name = action.String() + "." + (*dt).String() + "#" + peerID + } + + from, err := oclib.GenerateNodeID() + if err != nil { + return err + } + priv, err := common.LoadKeyFromFile(false) + if err != nil { + return err + } + msg, _ := json.Marshal(models.NewEvent(name, from, dt, user, payload, priv)) + topic, err := ps.PS.Join(name) + if err != nil { + return err + } + return topic.Publish(ctx, msg) +} + +func (pc *PubSubService) getDefaultFilter(search string) map[string][]dbs.Filter { + return map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided + "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, + } +} + +// TODO REVIEW PUBLISHING + ADD SEARCH ON PUBLIC : YES +// TODO : Search should verify DataType diff --git a/daemons/node/pubsub/service.go b/daemons/node/pubsub/service.go new file mode 100644 index 0000000..11d2407 --- /dev/null +++ b/daemons/node/pubsub/service.go @@ -0,0 +1,45 @@ +package pubsub + +import ( + "context" + "oc-discovery/daemons/node/common" + "oc-discovery/daemons/node/stream" + "strings" + "sync" + + "cloud.o-forge.io/core/oc-lib/tools" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" +) + +type PubSubService struct { + Node common.DiscoveryPeer + Host host.Host + PS *pubsub.PubSub + StreamService *stream.StreamService + Subscription []string + mutex sync.RWMutex +} + +func InitPubSub(ctx context.Context, h host.Host, ps *pubsub.PubSub, node common.DiscoveryPeer, streamService *stream.StreamService) (*PubSubService, error) { + service := &PubSubService{ + Node: node, + Host: h, + StreamService: streamService, + PS: ps, + } + + service.initSubscribeEvents(ctx) + return service, nil +} + +func (ps *PubSubService) getTopicName(topicName string) tools.PubSubAction { + ns := strings.Split(topicName, ".") + if len(ns) > 0 { + return tools.GetActionString(ns[0]) + } + return tools.NONE +} + +func (ix *PubSubService) Close() { +} diff --git a/daemons/pubsub/subscribe.go b/daemons/node/pubsub/subscribe.go similarity index 59% rename from daemons/pubsub/subscribe.go rename to daemons/node/pubsub/subscribe.go index 0506c79..69b1557 100644 --- a/daemons/pubsub/subscribe.go +++ b/daemons/node/pubsub/subscribe.go @@ -4,10 +4,7 @@ import ( "context" "encoding/json" "errors" - "fmt" - "oc-discovery/daemons" - "oc-discovery/daemons/dht" - "oc-discovery/models" + "oc-discovery/daemons/node/common" "slices" "time" @@ -18,43 +15,9 @@ import ( ) func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error { - ourPeerID, err := oclib.GenerateNodeID() - if err != nil { - return err - } if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, "", -1); err != nil { return err } - if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, ourPeerID, -1); err != nil { // we subscribe at our proprer deductible search adresse. - return err - } - if err := ps.initPartnersSubscribeEvents(ctx); err != nil { - return nil - } - return nil -} - -func (ps *PubSubService) initPartnersSubscribeEvents(ctx context.Context) error { - // search all your partners : we check in base for this because we keep actively peer state if partners - access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - peers := access.Search(nil, fmt.Sprintf("%v", peer.PARTNER.EnumIndex()), false) - for _, p := range peers.Data { - if err := ps.PartnerSubscribeEvents(ctx, p.(*peer.Peer).PeerID); err != nil { - return err - } - } - return nil -} - -func (ps *PubSubService) PartnerSubscribeEvents(ctx context.Context, myPartnerPeerID string) error { - if myPartnerPeerID == "" { - return errors.New("should discover a particular partner peer") - } - for _, action := range []tools.PubSubAction{tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE} { - if err := ps.subscribeEvents(ctx, nil, action, myPartnerPeerID, -1); err != nil { - return err - } - } return nil } @@ -118,21 +81,21 @@ func (ps *PubSubService) waitResults(ctx context.Context, sub *pubsub.Subscripti } continue } - var evt models.Event + var evt common.Event if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event continue } - if p, err := dht.GetDHTService().GetPeer(ctx, evt.From); err == nil { - if err := ps.processEventPeerKnown(ctx, p, evt, topicName); err != nil { + if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil { + if err := ps.processEvent(ctx, p, &evt, topicName); err != nil { logger.Err(err) } } } } -func (ps *PubSubService) processEventPeerKnown( - ctx context.Context, p *peer.Peer, event models.Event, topicName string) error { - if err := daemons.VerifyPeer([]*peer.Peer{p}, event); err != nil { +func (ps *PubSubService) processEvent( + ctx context.Context, p *peer.Peer, event *common.Event, topicName string) error { + if err := event.Verify(p); err != nil { return err } return ps.handleEvent(ctx, topicName, event) diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go new file mode 100644 index 0000000..c9cd0f4 --- /dev/null +++ b/daemons/node/stream/handler.go @@ -0,0 +1,123 @@ +package stream + +import ( + "context" + "encoding/json" + "errors" + "oc-discovery/daemons/node/common" + "strings" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/tools" +) + +func (ps *StreamService) getTopicName(topicName string) tools.PubSubAction { + ns := strings.Split(topicName, ".") + if len(ns) > 0 { + return tools.GetActionString(ns[0]) + } + return tools.NONE +} + +func (ps *StreamService) handleEvent(topicName string, evt *common.Event) error { + action := ps.getTopicName(topicName) + if err := ps.handleEventFromPartner(evt, action); err != nil { + return err + } + if action == tools.PB_SEARCH_RESPONSE { + if err := ps.retrieveResponse(evt); err != nil { + return err + } + } + return nil +} + +func (abs *StreamService) retrieveResponse(event *common.Event) error { // + res, err := resources.ToResource(int(event.DataType), event.Payload) + if err != nil || res == nil { + return nil + } + b, err := json.Marshal(res.Serialize(res)) + tools.NewNATSCaller().SetNATSPub(tools.CATALOG_SEARCH_EVENT, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.DataType(event.DataType), + Method: int(tools.CATALOG_SEARCH_EVENT), + Payload: b, + }) + return nil +} + +func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools.PubSubAction) error { + if !(action == tools.PB_CREATE || action == tools.PB_UPDATE || action == tools.PB_DELETE) { + return nil + } + resource, err := resources.ToResource(int(evt.DataType), evt.Payload) + if err != nil { + return err + } + b, err := json.Marshal(resource) + if err != nil { + return err + } + switch action { + case tools.PB_SEARCH: + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + peers := access.Search(nil, evt.From, false) + if len(peers.Data) > 0 { + p := peers.Data[0].(*peer.Peer) + // TODO : something if peer is missing in our side ! + ps.SendResponse(p, evt) + } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil { + ps.SendResponse(p, evt) + } + case tools.PB_CREATE: + case tools.PB_UPDATE: + tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.DataType(evt.DataType), + Method: int(tools.CREATE_RESOURCE), + Payload: b, + }) + case tools.PB_DELETE: + tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.DataType(evt.DataType), + Method: int(tools.REMOVE_RESOURCE), + Payload: b, + }) + default: + return errors.New("no action authorized available : " + action.String()) + } + return nil +} + +func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event) error { + dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)} + if event.DataType == -1 { // expect all resources + dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE), + oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)} + } + var m map[string]string + err := json.Unmarshal(event.Payload, &m) + if err != nil { + return err + } + for _, dt := range dts { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil) + peerID := p.GetID() + searched := access.Search(abs.FilterPeer(peerID, m["search"]), "", false) + for _, ss := range searched.Data { + if j, err := json.Marshal(ss); err == nil { + if event.DataType != -1 { + ndt := tools.DataType(dt.EnumIndex()) + abs.PublishResources(&ndt, event.User, peerID, j) + } else { + abs.PublishResources(nil, event.User, peerID, j) + } // TODO : TEMP STREAM ! + } + } + } + return nil +} diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go new file mode 100644 index 0000000..24423e0 --- /dev/null +++ b/daemons/node/stream/publish.go @@ -0,0 +1,150 @@ +package stream + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "oc-discovery/daemons/node/common" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/libp2p/go-libp2p/core/network" + pp "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPeerID string, resource []byte) error { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + p := access.LoadOne(toPeerID) + if p.Err != "" { + return errors.New(p.Err) + } else { + ad, err := pp.AddrInfoFromString(p.Data.(*peer.Peer).StreamAddress) + if err != nil { + return err + } + ps.write(tools.PB_SEARCH, toPeerID, ad, dt, user, resource, ProtocolSearchResource) + } + return nil +} + +func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string, search string) error { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + peers := access.Search(nil, search, false) + if peers.Err != "" { + return errors.New(peers.Err) + } else { + b, err := json.Marshal(map[string]string{"search": search}) + if err != nil { + return err + } + for _, p := range peers.Data { + ad, err := pp.AddrInfoFromString(p.(*peer.Peer).StreamAddress) + if err != nil { + continue + } + ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource) + } + } + return nil +} + +func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user string, search string) error { + if peers, err := ps.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())); err != nil { + return err + } else { + b, err := json.Marshal(map[string]string{"search": search}) + if err != nil { + return err + } + for _, p := range peers { + ad, err := pp.AddrInfoFromString(p.StreamAddress) + if err != nil { + continue + } + ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource) + } + } + return nil +} + +func (ps *StreamService) ToPartnerPublishEvent( + ctx context.Context, action tools.PubSubAction, dt *tools.DataType, user string, payload []byte) error { + if peers, err := ps.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())); err != nil { + return err + } else { + for _, p := range peers { + for _, protocol := range protocols { + ad, err := pp.AddrInfoFromString(p.StreamAddress) + if err != nil { + continue + } + ps.write(action, p.GetID(), ad, dt, user, payload, protocol) + } + } + } + return nil +} + +func (s *StreamService) write( + action tools.PubSubAction, + did string, + peerID *pp.AddrInfo, + dt *tools.DataType, + user string, + payload []byte, + proto protocol.ID) error { + logger := oclib.GetLogger() + + name := action.String() + "#" + peerID.ID.String() + if dt != nil { + name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String() + } + s.mu.Lock() + defer s.mu.Unlock() + if s.Streams[proto] == nil { + s.Streams[proto] = map[pp.ID]*common.Stream{} + } + + if s.Streams[proto][peerID.ID] == nil { + // should create a very temp stream + ctxTTL, err := context.WithTimeout(context.Background(), 10*time.Second) + if err == nil { + if s.Host.Network().Connectedness(peerID.ID) != network.Connected { + _ = s.Host.Connect(ctxTTL, *peerID) + str, err := s.Host.NewStream(ctxTTL, peerID.ID, ProtocolHeartbeatPartner) + if err == nil { + s.Streams[ProtocolHeartbeatPartner][peerID.ID] = &common.Stream{ + DID: did, + Stream: str, + Expiry: time.Now().UTC().Add(5 * time.Second), + } + str2, err := s.Host.NewStream(ctxTTL, peerID.ID, proto) + if err == nil { + s.Streams[proto][peerID.ID] = &common.Stream{ + DID: did, + Stream: str2, + Expiry: time.Now().UTC().Add(5 * time.Second), + } + } + } + + } + } + return errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String()) + } + stream := s.Streams[proto][peerID.ID] + + enc := json.NewEncoder(stream.Stream) + + evt := common.NewEvent(name, peerID.ID.String(), dt, user, payload) + if err := enc.Encode(evt); err != nil { + stream.Stream.Close() + logger.Err(err) + return nil + } + return nil +} diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go new file mode 100644 index 0000000..7f08392 --- /dev/null +++ b/daemons/node/stream/service.go @@ -0,0 +1,228 @@ +package stream + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "oc-discovery/daemons/node/common" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/peer" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + pp "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +const ProtocolSearchResource = "/opencloud/resource/search/1.0" +const ProtocolCreateResource = "/opencloud/resource/create/1.0" +const ProtocolUpdateResource = "/opencloud/resource/update/1.0" +const ProtocolDeleteResource = "/opencloud/resource/delete/1.0" + +const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0" + +var protocols = []protocol.ID{ + ProtocolSearchResource, + ProtocolCreateResource, + ProtocolUpdateResource, + ProtocolDeleteResource, +} + +type StreamService struct { + Key pp.ID + Host host.Host + Node common.DiscoveryPeer + Streams common.ProtocolStream + maxNodesConn int + mu sync.Mutex + // Stream map[protocol.ID]map[pp.ID]*daemons.Stream +} + +func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) { + service := &StreamService{ + Key: key, + Node: node, + Host: h, + Streams: common.ProtocolStream{}, + maxNodesConn: maxNode, + } + service.Host.SetStreamHandler(ProtocolHeartbeatPartner, service.HandlePartnerHeartbeat) + service.connectToPartners() // we set up a stream + go service.StartGC(30 * time.Second) + return service, nil +} + +func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) { + pid, hb, err := common.CheckHeartbeat(s.Host, stream, s.maxNodesConn) + if err != nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + streams := s.Streams[ProtocolHeartbeatPartner] + if streams == nil { + s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} + return + } + // if record already seen update last seen + if rec, ok := streams[*pid]; ok { + rec.DID = hb.DID + rec.Expiry = time.Now().UTC().Add(2 * time.Minute) + } else { // if not in stream ? + pid := stream.Conn().RemotePeer() + ai, err := pp.AddrInfoFromP2pAddr(stream.Conn().RemoteMultiaddr()) + if err == nil { + s.connectToPartner(pid, ai) + } + } + go s.StartGC(30 * time.Second) +} + +func (s *StreamService) connectToPartners() error { + peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())) + if err != nil { + return err + } + for _, p := range peers { + ad, err := pp.AddrInfoFromString(p.StreamAddress) + if err != nil { + return err + } + pid, err := pp.Decode(p.PeerID) + if err != nil { + continue + } + s.connectToPartner(pid, ad) + // heartbeat your partner. + } + // TODO if handle... from partner then HeartBeat back + return nil +} + +func (s *StreamService) connectToPartner(pid pp.ID, ad *pp.AddrInfo) { + for _, proto := range protocols { + f := func(ss network.Stream) { + if s.Streams[proto] == nil { + s.Streams[proto] = map[pp.ID]*common.Stream{} + } + s.Streams[proto][pid] = &common.Stream{ + Stream: ss, + Expiry: time.Now().Add(2 * time.Minute), + } + go s.readLoop(s.Streams[proto][pid]) + } + s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, &f) + if s.Streams[proto][pid] != nil { + go s.readLoop(s.Streams[proto][pid]) // reaaaad... + } + } + common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, + s.Host, s.Streams, []*pp.AddrInfo{ad}, time.Minute) +} + +func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) { + ps := []*peer.Peer{} + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + peers := access.Search(nil, search, false) + if len(peers.Data) == 0 { + return ps, errors.New("no self available") + } + for _, p := range peers.Data { + ps = append(ps, p.(*peer.Peer)) + } + return ps, nil +} + +func (ix *StreamService) Close() { + for _, s := range ix.Streams { + for _, ss := range s { + ss.Stream.Close() + } + } +} + +func (s *StreamService) StartGC(interval time.Duration) { + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for range t.C { + s.gc() + } + }() +} + +func (s *StreamService) gc() { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + streams := s.Streams[ProtocolHeartbeatPartner] + if streams == nil { + s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} + return + } + for pid, rec := range streams { + if now.After(rec.Expiry) { + for _, sstreams := range s.Streams { + if sstreams[pid] != nil { + sstreams[pid].Stream.Close() + delete(sstreams, pid) + } + } + } + } +} + +func (ps *StreamService) readLoop(s *common.Stream) { + dec := json.NewDecoder(s.Stream) + for { + var evt common.Event + if err := dec.Decode(&evt); err != nil { + s.Stream.Close() + return + } + ps.handleEvent(evt.Type, &evt) + } +} + +func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters { + id, err := oclib.GetMySelf() + if err != nil { + return nil + } + filter := map[string][]dbs.Filter{ + "creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource... + "": {{Operator: dbs.OR.String(), Value: &dbs.Filters{ + Or: map[string][]dbs.Filter{ + "abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public + "abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances + And: map[string][]dbs.Filter{ + "resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ + And: map[string][]dbs.Filter{ + "resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}}, + }, + }}}, + }, + }}}, + }, + }}}, + } + if search != "" { + filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{ + Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided + "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, + }, + }}} + } + return &dbs.Filters{ + And: filter, + } +} diff --git a/daemons/pubsub/handler.go b/daemons/pubsub/handler.go deleted file mode 100644 index 4e23b7c..0000000 --- a/daemons/pubsub/handler.go +++ /dev/null @@ -1,89 +0,0 @@ -package pubsub - -import ( - "context" - "encoding/json" - "errors" - "oc-discovery/daemons" - "oc-discovery/daemons/dht" - "oc-discovery/models" - - "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/models/resources" - "cloud.o-forge.io/core/oc-lib/tools" -) - -func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt models.Event) error { - action := ps.getTopicName(topicName) - if err := ps.handleEventFromPartner(evt, action); err != nil { - return err - } - if err := ps.handleEventSearch(ctx, evt, action); err != nil { - return err - } - return nil -} - -func (ps *PubSubService) handleEventFromPartner(evt models.Event, action tools.PubSubAction) error { - if !(action == tools.PB_CREATE || action == tools.PB_UPDATE || action == tools.PB_DELETE) { - return nil - } - resource, err := resources.ToResource(int(evt.DataType), evt.Payload) - if err != nil { - return err - } - b, err := json.Marshal(resource) - if err != nil { - return err - } - switch action { - case tools.PB_CREATE: - case tools.PB_UPDATE: - tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.DataType(evt.DataType), - Method: int(tools.CREATE_RESOURCE), - Payload: b, - }) - case tools.PB_DELETE: - tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.DataType(evt.DataType), - Method: int(tools.REMOVE_RESOURCE), - Payload: b, - }) - default: - return errors.New("no action authorized available : " + action.String()) - } - return nil -} - -func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 canals for every partner. - ctx context.Context, - evt models.Event, - action tools.PubSubAction, -) error { - if !(action == tools.PB_SEARCH_RESPONSE || action == tools.PB_SEARCH) { - return nil - } - if p, err := dht.GetDHTService().GetPeer(ctx, evt.From); err == nil { - if err := daemons.VerifyPeer([]*peer.Peer{p}, evt); err != nil { - return err - } - switch action { - case tools.PB_SEARCH_RESPONSE: - if err := ps.retrieveResponse(ctx, p, evt); err != nil { - return err - } - case tools.PB_SEARCH: // when someone ask for search. - if p, err := dht.GetDHTService().GetPeer(ctx, evt.From); err == nil { - if err := ps.sendResponse(ctx, p, evt); err != nil { - return err - } - } - default: - return nil - } - } - return nil -} diff --git a/daemons/pubsub/publish.go b/daemons/pubsub/publish.go deleted file mode 100644 index 967f727..0000000 --- a/daemons/pubsub/publish.go +++ /dev/null @@ -1,205 +0,0 @@ -package pubsub - -import ( - "context" - "encoding/json" - "errors" - "oc-discovery/daemons" - "oc-discovery/models" - "time" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/dbs" - "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/tools" -) - -func (ps *PubSubService) SearchPublishEvent( - ctx context.Context, - dt *tools.DataType, - typ string, - user string, - search string, -) error { - switch typ { - case "partner": - ps.searchPartnersPublishEvent( - ctx, dt, user, search, - ) - case "all": - b, err := json.Marshal(map[string]string{ - "search": search, - }) - if err != nil { - return err - } - ps.searchPublishEvent( - ctx, dt, user, "", b, - ) - case "known": - ps.searchKnownPublishEvent( - ctx, dt, user, search, - ) - default: - return errors.New("no type of research found") - } - return nil -} - -func (ps *PubSubService) searchPartnersPublishEvent( - ctx context.Context, - dt *tools.DataType, - user string, - search string, -) error { - access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - f := &dbs.Filters{ - And: map[string][]dbs.Filter{ // search by name if no filters are provided - "state": {{Operator: dbs.EQUAL.String(), Value: peer.ONLINE.EnumIndex()}}, - "relation": {{Operator: dbs.EQUAL.String(), Value: peer.PARTNER.EnumIndex()}}, - }, - } - if search != "" { - f.Or = map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided - "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, - } - } - b, err := json.Marshal(map[string]string{ - "search": search, - }) - if err != nil { - return err - } - peersKnown := access.Search(f, "", false) - for _, known := range peersKnown.Data { - if err := ps.searchPublishEvent(ctx, dt, user, known.GetID(), b); err != nil { - return err - } - } - return nil -} - -func (ps *PubSubService) searchKnownPublishEvent( - ctx context.Context, - dt *tools.DataType, - user string, - search string, -) error { - access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - f := &dbs.Filters{ - And: map[string][]dbs.Filter{ // search by name if no filters are provided - "state": {{Operator: dbs.EQUAL.String(), Value: peer.ONLINE.EnumIndex()}}, - "relation": {{Operator: dbs.NOT.String(), Value: &dbs.Filters{ - And: map[string][]dbs.Filter{ - "relation": {{Operator: dbs.EQUAL.String(), Value: peer.BLACKLIST.EnumIndex()}}, - }, - }}}, - }, - } - if search != "" { - f.Or = map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided - "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, - } - } - b, err := json.Marshal(map[string]string{ - "search": search, - }) - if err != nil { - return err - } - peersKnown := access.Search(f, "", false) - for _, known := range peersKnown.Data { - if err := ps.searchPublishEvent(ctx, dt, user, known.GetID(), b); err != nil { - return err - } - } - return nil -} - -func (ps *PubSubService) searchPublishEvent( - ctx context.Context, - dt *tools.DataType, - user string, - peerID string, - payload []byte, -) error { - id, err := oclib.GenerateNodeID() - if err != nil { - return err - } - if err := ps.subscribeEvents(ctx, dt, tools.PB_SEARCH_RESPONSE, id, 60); err != nil { // TODO Catpure Event ! - return err - } - return ps.publishEvent(ctx, dt, tools.PB_SEARCH, user, peerID, payload, false) -} - -func (ps *PubSubService) ToPartnerPublishEvent( - ctx context.Context, action tools.PubSubAction, dt *tools.DataType, user string, payload []byte) error { - id, err := oclib.GenerateNodeID() - if err != nil { - return err - } - return ps.publishEvent(ctx, dt, action, user, id, payload, false) -} - -func (ps *PubSubService) publishEvent( - ctx context.Context, - dt *tools.DataType, - action tools.PubSubAction, - user string, - peerID string, - payload []byte, - chanNamedByDt bool, -) error { - name := action.String() + "#" + peerID - if chanNamedByDt && dt != nil { // if a datatype is precised then : app.action.datatype#peerID - name = action.String() + "." + (*dt).String() + "#" + peerID - } - - from, err := oclib.GenerateNodeID() - if err != nil { - return err - } - evt := models.Event{ - Type: name, - From: from, - User: user, - Timestamp: time.Now().Unix(), - Payload: payload, - } - if dt != nil { - evt.DataType = int64(dt.EnumIndex()) - } else { - evt.DataType = -1 - } - - body, _ := json.Marshal(evt) - priv, err := daemons.LoadKeyFromFile(false) - if err != nil { - return err - } - sig, _ := priv.Sign(body) - evt.Signature = sig - - msg, _ := json.Marshal(evt) - - topic, err := ps.PS.Join(name) - if err != nil { - return err - } - - return topic.Publish(ctx, msg) -} - -// TODO REVIEW PUBLISHING + ADD SEARCH ON PUBLIC : YES -// TODO : Search should verify DataType diff --git a/daemons/pubsub/service.go b/daemons/pubsub/service.go deleted file mode 100644 index 7fb2078..0000000 --- a/daemons/pubsub/service.go +++ /dev/null @@ -1,127 +0,0 @@ -package pubsub - -import ( - "context" - "encoding/json" - "oc-discovery/models" - "strings" - "sync" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/dbs" - "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/models/resources" - "cloud.o-forge.io/core/oc-lib/tools" - pubsub "github.com/libp2p/go-libp2p-pubsub" -) - -type PubSubService struct { - PS *pubsub.PubSub - Subscription []string - mutex sync.RWMutex -} - -var pubsubSingleton *PubSubService - -func Init(ctx context.Context, ps *pubsub.PubSub) { - pubsubSingleton = &PubSubService{ - PS: ps, - Subscription: []string{}, - } - pubsubSingleton.initSubscribeEvents(ctx) -} - -func GetPubSubService() *PubSubService { - return pubsubSingleton -} - -func (ps *PubSubService) getTopicName(topicName string) tools.PubSubAction { - ns := strings.Split(topicName, ".") - if len(ns) > 0 { - return tools.GetActionString(ns[0]) - } - return tools.NONE -} - -func (abs *PubSubService) retrieveResponse(ctx context.Context, p *peer.Peer, event models.Event) error { - res, err := resources.ToResource(int(event.DataType), event.Payload) - if err != nil || res == nil { - return nil - } - b, err := json.Marshal(res.Serialize(res)) - tools.NewNATSCaller().SetNATSPub(tools.CATALOG_SEARCH_EVENT, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.DataType(event.DataType), - Method: int(tools.CATALOG_SEARCH_EVENT), - Payload: b, - }) - return nil -} - -func (abs *PubSubService) sendResponse(ctx context.Context, p *peer.Peer, event models.Event) error { - dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)} - if event.DataType == -1 { // expect all resources - dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE), - oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)} - } - var m map[string]string - err := json.Unmarshal(event.Payload, &m) - if err != nil { - return err - } - for _, dt := range dts { - access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil) - peerID := p.GetID() - searched := access.Search(abs.filterPeer(peerID, m["search"]), "", false) - for _, ss := range searched.Data { - if j, err := json.Marshal(ss); err == nil { - if event.DataType != -1 { - ndt := tools.DataType(dt.EnumIndex()) - abs.publishEvent(ctx, &ndt, tools.PB_SEARCH_RESPONSE, event.User, peerID, j, true) - } else { - abs.publishEvent(ctx, nil, tools.PB_SEARCH_RESPONSE, event.User, peerID, j, true) - } - } - } - } - return nil -} - -func (abs *PubSubService) filterPeer(peerID string, search string) *dbs.Filters { - id, err := oclib.GetMySelf() - if err != nil { - return nil - } - filter := map[string][]dbs.Filter{ - "creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource... - "": {{Operator: dbs.OR.String(), Value: &dbs.Filters{ - Or: map[string][]dbs.Filter{ - "abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public - "abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances - And: map[string][]dbs.Filter{ - "resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ - And: map[string][]dbs.Filter{ - "resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}}, - }, - }}}, - }, - }}}, - }, - }}}, - } - if search != "" { - filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{ - Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided - "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, - }, - }}} - } - return &dbs.Filters{ - And: filter, - } -} diff --git a/go.mod b/go.mod index 5d0bc51..b91bb11 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-discovery go 1.24.6 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260128162702-97cf629e27ec + cloud.o-forge.io/core/oc-lib v0.0.0-20260129122033-186ba3e689c7 github.com/beego/beego v1.12.13 github.com/beego/beego/v2 v2.3.8 github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index a927c23..19cca33 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260128160440-c0d89ea9e1e8 h1:h7VHJktaTT8Tx cloud.o-forge.io/core/oc-lib v0.0.0-20260128160440-c0d89ea9e1e8/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20260128162702-97cf629e27ec h1:/uvrtEt7A5rwqFPHH8yjujlC33HMjQHhWDIK6I08DrA= cloud.o-forge.io/core/oc-lib v0.0.0-20260128162702-97cf629e27ec/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260129121215-c1519f6b26b8 h1:gvUbTwHnYM0Ezzvoa9ylTt+o1lAhS0U79OogbsZ+Pl8= +cloud.o-forge.io/core/oc-lib v0.0.0-20260129121215-c1519f6b26b8/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260129122033-186ba3e689c7 h1:NRFGRqN+j5g3DrtXMYN5T5XSYICG+OU2DisjBdID3j8= +cloud.o-forge.io/core/oc-lib v0.0.0-20260129122033-186ba3e689c7/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/main.go b/main.go index fd4110e..8a337ca 100644 --- a/main.go +++ b/main.go @@ -2,31 +2,37 @@ package main import ( "context" + "log" "oc-discovery/conf" - "oc-discovery/daemons/dht" - "oc-discovery/daemons/pubsub" + "oc-discovery/daemons/node" + "os" + "os/signal" + "strings" + "syscall" oclib "cloud.o-forge.io/core/oc-lib" beego "github.com/beego/beego/v2/server/web" "github.com/beego/beego/v2/server/web/filter/cors" - pss "github.com/libp2p/go-libp2p-pubsub" ) const appname = "oc-discovery" func main() { - logger := oclib.GetLogger() // Init the oc-lib oclib.Init(appname) // get the right config file o := oclib.GetConfLoader() + conf.GetConfig().Name = o.GetStringDefault("NAME", "opencloud-demo") + conf.GetConfig().Hostname = o.GetStringDefault("HOSTNAME", "localhost") conf.GetConfig().PSKPath = o.GetStringDefault("PSK_PATH", "./psk/psk") - conf.GetConfig().DHTEndpointPort = o.GetInt64Default("DHT_ENDPOINT_PORT", 4001) + conf.GetConfig().NodeEndpointPort = o.GetInt64Default("NODE_ENDPOINT_PORT", 4001) conf.GetConfig().PublicKeyPath = o.GetStringDefault("PUBLIC_KEY_PATH", "./pem/public.pem") conf.GetConfig().PrivateKeyPath = o.GetStringDefault("PRIVATE_KEY_PATH", "./pem/private.pem") - conf.GetConfig().BootstrapAddresses = o.GetStringDefault("BOOTSTRAP_ADDRESSES", "") + conf.GetConfig().IndexerAddresses = o.GetStringDefault("INDEXER_ADDRESSES", "") + + conf.GetConfig().NodeMode = o.GetStringDefault("NODE_MODE", "node") // Normal beego init beego.BConfig.AppName = appname @@ -40,20 +46,23 @@ func main() { ExposeHeaders: []string{"Content-Length", "Content-Type"}, AllowCredentials: true, })) - ctx := context.Background() - DHT, err := dht.Init(ctx) - if err == nil { - logger.Err(err) + + ctx, stop := signal.NotifyContext( + context.Background(), + os.Interrupt, + syscall.SIGTERM, + ) + defer stop() + + isNode := strings.Contains(conf.GetConfig().NodeMode, "node") + isIndexer := strings.Contains(conf.GetConfig().NodeMode, "indexer") + + if n, err := node.InitNode(isNode, isIndexer); err != nil { + panic(err) } else { - ps, err := pss.NewGossipSub( - ctx, DHT.Host, - pss.WithMessageSigning(true), - pss.WithStrictSignatureVerification(true), - ) - pubsub.Init(ctx, ps) - if err != nil { - logger.Err(err) - } + <-ctx.Done() // 👈 the only blocking point + log.Println("shutting down") + n.Close() } - beego.Run() + } diff --git a/models/event.go b/models/event.go index 7feb9b6..dbdc2a6 100644 --- a/models/event.go +++ b/models/event.go @@ -1,6 +1,12 @@ package models -import "encoding/json" +import ( + "encoding/json" + "time" + + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/libp2p/go-libp2p/core/crypto" +) type Event struct { Type string `json:"type"` @@ -14,6 +20,26 @@ type Event struct { Signature []byte `json:"sig"` } +func NewEvent(name string, from string, dt *tools.DataType, user string, payload []byte, priv crypto.PrivKey) *Event { + evt := &Event{ + Type: name, + From: from, + User: user, + Timestamp: time.Now().Unix(), + Payload: payload, + } + if dt != nil { + evt.DataType = int64(dt.EnumIndex()) + } else { + evt.DataType = -1 + } + + body, _ := json.Marshal(evt) + sig, _ := priv.Sign(body) + evt.Signature = sig + return evt +} + func (e *Event) RawEvent() *Event { return &Event{ Type: e.Type,