diff --git a/daemons/node/common/crypto.go b/daemons/node/common/crypto.go index 945581a..4da2c6f 100644 --- a/daemons/node/common/crypto.go +++ b/daemons/node/common/crypto.go @@ -96,7 +96,6 @@ func LoadPSKFromFile() (pnet.PSK, error) { if err != nil { return nil, err } - fmt.Println("sqqdsqsqd") psk, err := pnet.DecodeV1PSK(bytes.NewReader(data)) if err != nil { return nil, err diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index f9547ae..b7f6dea 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -26,6 +26,8 @@ type PeerRecord struct { WalletAddress string `json:"wallet_address"` Signature []byte `json:"signature"` ExpiryDate time.Time `json:"expiry_date"` + + TTL int `json:"ttl"` // max of hop diffusion } func (p *PeerRecord) Sign() error { @@ -155,6 +157,19 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) { LastSeen: time.Now(), } } + + if rec.TTL > 0 { + for _, ad := range common.StaticIndexers { + if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { + continue + } + stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] + rec.TTL -= 1 + if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream + continue + } + } + } } func (ix *IndexerService) handleNodeGet(s network.Stream) { @@ -184,6 +199,33 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) { return } } + // if not found ask to my neighboor indexers + if common.StreamIndexers[common.ProtocolPublish] == nil { + _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) + return + } + for _, ad := range common.StaticIndexers { + if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { + continue + } + stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] + if err := json.NewEncoder(stream.Stream).Encode(GetValue{Key: req.Key}); err != nil { + continue + } + + var resp GetResponse + if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { + continue + } + if resp.Found { + _ = json.NewEncoder(s).Encode(GetResponse{ + Found: true, + Record: resp.Record, + }) + return + } + } + // Not found _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) } diff --git a/daemons/nats.go b/daemons/node/nats.go similarity index 93% rename from daemons/nats.go rename to daemons/node/nats.go index d1b929e..1f0bb30 100644 --- a/daemons/nats.go +++ b/daemons/node/nats.go @@ -1,15 +1,14 @@ -package daemons +package node import ( "context" "encoding/json" "fmt" - "oc-discovery/daemons/node" "cloud.o-forge.io/core/oc-lib/tools" ) -func ListenNATS(n node.Node) { +func ListenNATS(n Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { var propalgation tools.PropalgationMessage diff --git a/daemons/node/node.go b/daemons/node/node.go index 1e62f08..ef913f6 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -92,6 +92,8 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { logger.Info().Msg("generate opencloud indexer...") node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5) } + logger.Info().Msg("connect to NATS") + ListenNATS(*node) logger.Info().Msg("Node is actually running.") return node, nil } @@ -131,7 +133,7 @@ func (d *Node) publishPeerRecord( rec.ExpiryDate = base.ExpiryDate rec.Signature, err = priv.Sign(hash[:]) - + rec.TTL = 2 if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream return err } diff --git a/daemons/node/pubsub/publish.go b/daemons/node/pubsub/publish.go index 19c927c..9f63d5f 100644 --- a/daemons/node/pubsub/publish.go +++ b/daemons/node/pubsub/publish.go @@ -8,7 +8,6 @@ import ( "oc-discovery/models" oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -67,16 +66,5 @@ func (ps *PubSubService) publishEvent( return topic.Publish(ctx, msg) } -func (pc *PubSubService) getDefaultFilter(search string) map[string][]dbs.Filter { - return map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided - "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, - } -} - // TODO REVIEW PUBLISHING + ADD SEARCH ON PUBLIC : YES // TODO : Search should verify DataType diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index 24423e0..85548b5 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -9,6 +9,7 @@ import ( "time" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" "github.com/libp2p/go-libp2p/core/network" @@ -33,7 +34,15 @@ func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPee func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string, search string) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - peers := access.Search(nil, search, false) + peers := access.Search(&dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided + And: map[string][]dbs.Filter{ + "": {{Operator: dbs.NOT.String(), Value: dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided + And: map[string][]dbs.Filter{ + "relation": {{Operator: dbs.EQUAL.String(), Value: peer.BLACKLIST}}, + }, + }}}, + }, + }, search, false) if peers.Err != "" { return errors.New(peers.Err) } else {