diff --git a/daemons/node/common/common_service.go b/daemons/node/common/common_service.go index fd37bdc..4778362 100644 --- a/daemons/node/common/common_service.go +++ b/daemons/node/common/common_service.go @@ -42,7 +42,10 @@ type LongLivedStreamRecordedService[T interface{}] struct { // remotePeer is the peer that sent the heartbeat (used for offload routing). // need is how many more indexers the node wants (from hb.Need). // referent is true when the node designated this indexer as its search referent. - BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *HeartbeatResponse + // rawRecord is the fresh signed PeerRecord embedded in the heartbeat (hb.Record), + // passed directly so the handler does not race with AfterHeartbeat goroutine + // updating StreamRecord.Record. + BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *HeartbeatResponse } func (ix *LongLivedStreamRecordedService[T]) MaxNodesConn() int { @@ -240,11 +243,11 @@ func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) { ix.StreamMU.RUnlock() } if ix.AfterHeartbeat != nil && hb.DID != "" { - ix.AfterHeartbeat(hb) + go ix.AfterHeartbeat(hb) } // Send response back to the node (bidirectional heartbeat). if ix.BuildHeartbeatResponse != nil { - if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent); resp != nil { + if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent, hb.Record); resp != nil { s.SetWriteDeadline(time.Now().Add(3 * time.Second)) json.NewEncoder(s).Encode(resp) s.SetWriteDeadline(time.Time{}) diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index b2f5cb5..d15ec5a 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -138,8 +138,7 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String()) s, err := h.NewStream(ctx, p.ID, proto) if err != nil { - fmt.Println(s, err, p.ID) - logger.Err(err) + logger.Err(err).Msg(err.Error()) return nil, 0, err } pss = &Stream{ diff --git a/daemons/node/connection_gater.go b/daemons/node/connection_gater.go index b00af77..02385f0 100644 --- a/daemons/node/connection_gater.go +++ b/daemons/node/connection_gater.go @@ -1,16 +1,6 @@ package node import ( - "context" - "encoding/json" - "time" - - "oc-discovery/daemons/node/common" - "oc-discovery/daemons/node/indexer" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/dbs" - "cloud.o-forge.io/core/oc-lib/models/peer" "github.com/libp2p/go-libp2p/core/control" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -18,13 +8,10 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -// OCConnectionGater enforces two rules on every inbound connection: -// 1. If the peer is known locally and blacklisted → reject. -// 2. If the peer is unknown locally → ask indexers one by one whether it -// exists in the DHT. Accept as soon as one confirms it; reject if none do -// (or if no indexers are reachable yet, allow optimistically). -// -// Outbound connections are always allowed — we chose to dial them. +// OCConnectionGater allows all connections unconditionally. +// Peer validation (local DB + DHT by peer_id) is enforced at the stream level +// in each handler, so ProtocolHeartbeat and ProtocolPublish — through which a +// node first registers itself — are never blocked. type OCConnectionGater struct { host host.Host } @@ -33,105 +20,12 @@ func newOCConnectionGater(h host.Host) *OCConnectionGater { return &OCConnectionGater{host: h} } -// InterceptPeerDial — allow all outbound dials. -func (g *OCConnectionGater) InterceptPeerDial(_ pp.ID) bool { return true } - -// InterceptAddrDial — allow all outbound dials. -func (g *OCConnectionGater) InterceptAddrDial(_ pp.ID, _ ma.Multiaddr) bool { return true } - -// InterceptAccept — allow at transport level (PeerID not yet known). -func (g *OCConnectionGater) InterceptAccept(_ network.ConnMultiaddrs) bool { return true } - -// InterceptUpgraded — final gate; always allow (decisions already made in InterceptSecured). +func (g *OCConnectionGater) InterceptPeerDial(_ pp.ID) bool { return true } +func (g *OCConnectionGater) InterceptAddrDial(_ pp.ID, _ ma.Multiaddr) bool { return true } +func (g *OCConnectionGater) InterceptAccept(_ network.ConnMultiaddrs) bool { return true } +func (g *OCConnectionGater) InterceptSecured(_ network.Direction, _ pp.ID, _ network.ConnMultiaddrs) bool { + return true +} func (g *OCConnectionGater) InterceptUpgraded(_ network.Conn) (bool, control.DisconnectReason) { return true, 0 } - -// InterceptSecured is called after the cryptographic handshake — PeerID is now known. -// Only inbound connections are verified; outbound are trusted. -func (g *OCConnectionGater) InterceptSecured(dir network.Direction, pid pp.ID, _ network.ConnMultiaddrs) bool { - if dir == network.DirOutbound { - return true - } - logger := oclib.GetLogger() - - // 1. Local DB lookup by PeerID. - access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - results := access.Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ // search by name if no filters are provided - "peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}}, - }, - }, pid.String(), false) - for _, item := range results.Data { - p, ok := item.(*peer.Peer) - if !ok || p.PeerID != pid.String() { - continue - } - if p.Relation == peer.BLACKLIST { - logger.Warn().Str("peer", pid.String()).Msg("[gater] rejected blacklisted peer") - return false - } - // Known, not blacklisted. - return true - } - - // 2. Unknown locally — verify via indexers. - indexers := common.Indexers.GetAddrs() - - if len(indexers) == 0 { - // No indexers reachable yet — allow optimistically (bootstrap phase). - logger.Warn().Str("peer", pid.String()).Msg("[gater] no indexers available, allowing unverified inbound") - return true - } - - req := indexer.GetValue{PeerID: pid.String()} - // A single DHT GetValue already traverses the entire DHT network, so asking - // a second indexer would yield the same result. We only fall through to the - // next indexer if the current one is unreachable (transport error), not if - // it returns found=false (that answer is already DHT-wide authoritative). - for _, ai := range indexers { - found, reachable := queryIndexerPeerExists(g.host, *ai.Info, req) - if !reachable { - continue // indexer down — try next - } - if !found { - logger.Warn().Str("peer", pid.String()).Msg("[gater] peer not found in DHT, rejecting inbound") - } - return found // definitive DHT answer - } - - // All indexers unreachable — allow optimistically rather than blocking indefinitely. - logger.Warn().Str("peer", pid.String()).Msg("[gater] all indexers unreachable, allowing unverified inbound") - return true -} - -// queryIndexerPeerExists opens a fresh one-shot stream to ai, sends a GetValue -// request, and returns (found, reachable). -// reachable=false means the indexer could not be reached (transport error); -// the caller should then try another indexer. -// reachable=true means the indexer answered — found is the DHT-wide authoritative result. -func queryIndexerPeerExists(h host.Host, ai pp.AddrInfo, req indexer.GetValue) (found, reachable bool) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - if h.Network().Connectedness(ai.ID) != network.Connected { - if err := h.Connect(ctx, ai); err != nil { - return false, false - } - } - s, err := h.NewStream(ctx, ai.ID, common.ProtocolGet) - if err != nil { - return false, false - } - defer s.Close() - s.SetDeadline(time.Now().Add(3 * time.Second)) - - if err := json.NewEncoder(s).Encode(req); err != nil { - return false, false - } - var resp indexer.GetResponse - if err := json.NewDecoder(s).Decode(&resp); err != nil { - return false, false - } - return resp.Found, true -} diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index 7181d44..f110e8c 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -12,6 +12,7 @@ import ( "time" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" pp "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" @@ -108,6 +109,49 @@ func (ix *IndexerService) genPIDKey(peerID string) string { return "/pid/" + peerID } +// isPeerKnown is the stream-level gate: returns true if pid is allowed. +// Check order (fast → slow): +// 1. In-memory stream records — currently heartbeating to this indexer. +// 2. Local DB by peer_id — known peer, blacklist enforced here. +// 3. DHT /pid/{peerID} → /node/{DID} — registered on any indexer. +// +// ProtocolHeartbeat and ProtocolPublish handlers do NOT call this — they are +// the streams through which a node first makes itself known. +func (ix *IndexerService) isPeerKnown(pid lpp.ID) bool { + // 1. Fast path: active heartbeat session. + ix.StreamMU.RLock() + _, active := ix.StreamRecords[common.ProtocolHeartbeat][pid] + ix.StreamMU.RUnlock() + if active { + return true + } + // 2. Local DB: known peer (handles blacklist). + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + results := access.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}}, + }, + }, pid.String(), false) + for _, item := range results.Data { + p, ok := item.(*pp.Peer) + if !ok || p.PeerID != pid.String() { + continue + } + return p.Relation != pp.BLACKLIST + } + // 3. DHT lookup by peer_id. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + did, err := ix.DHT.GetValue(ctx, ix.genPIDKey(pid.String())) + cancel() + if err != nil || len(did) == 0 { + return false + } + ctx2, cancel2 := context.WithTimeout(context.Background(), 3*time.Second) + _, err = ix.DHT.GetValue(ctx2, ix.genKey(string(did))) + cancel2() + return err == nil +} + func (ix *IndexerService) initNodeHandler() { logger := oclib.GetLogger() logger.Info().Msg("Init Node Handler") @@ -144,21 +188,32 @@ func (ix *IndexerService) initNodeHandler() { logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: heartbeat record signature invalid") return } + // Keep StreamRecord.Record in sync so BuildHeartbeatResponse always + // sees a populated PeerRecord (Name, DID, etc.) regardless of whether + // handleNodePublish ran before or after the heartbeat stream was opened. + if pid, err := lpp.Decode(rec.PeerID); err == nil { + ix.StreamMU.Lock() + if srec, ok := ix.StreamRecords[common.ProtocolHeartbeat][pid]; ok { + srec.Record = rec + } + ix.StreamMU.Unlock() + } data, err := json.Marshal(rec) if err != nil { return } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - logger.Info().Msg("REFRESH PutValue " + ix.genKey(rec.DID)) if err := ix.DHT.PutValue(ctx, ix.genKey(rec.DID), data); err != nil { - logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh failed") - cancel() - return + logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh /node/ failed") } cancel() + // /pid/ is written unconditionally — the gater queries by PeerID and this + // index must stay fresh regardless of whether the /node/ write succeeded. if rec.PeerID != "" { ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) - ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID)) + if err := ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil { + logger.Warn().Err(err).Str("pid", rec.PeerID).Msg("indexer: DHT refresh /pid/ failed") + } cancel2() } } @@ -173,6 +228,12 @@ func (ix *IndexerService) initNodeHandler() { // Returns a random sample of indexers from the local DHT cache. func (ix *IndexerService) handleCandidateRequest(s network.Stream) { defer s.Close() + if !ix.isPeerKnown(s.Conn().RemotePeer()) { + logger := oclib.GetLogger() + logger.Warn().Str("peer", s.Conn().RemotePeer().String()).Msg("[candidates] unknown peer, rejecting stream") + s.Reset() + return + } s.SetDeadline(time.Now().Add(5 * time.Second)) var req common.IndexerCandidatesRequest if err := json.NewDecoder(s).Decode(&req); err != nil { @@ -280,6 +341,11 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) { defer s.Close() logger := oclib.GetLogger() remotePeer := s.Conn().RemotePeer() + if !ix.isPeerKnown(remotePeer) { + logger.Warn().Str("peer", remotePeer.String()).Msg("[get] unknown peer, rejecting stream") + s.Reset() + return + } if err := ix.behavior.RecordGet(remotePeer); err != nil { logger.Warn().Err(err).Str("peer", remotePeer.String()).Msg("get refused") s.Reset() diff --git a/daemons/node/indexer/search.go b/daemons/node/indexer/search.go index 9be3bf3..57904a4 100644 --- a/daemons/node/indexer/search.go +++ b/daemons/node/indexer/search.go @@ -74,6 +74,11 @@ func (ix *IndexerService) handleSearchPeer(s network.Stream) { logger := oclib.GetLogger() defer s.Reset() + if !ix.isPeerKnown(s.Conn().RemotePeer()) { + logger.Warn().Str("peer", s.Conn().RemotePeer().String()).Msg("[search] unknown peer, rejecting stream") + return + } + var req common.SearchPeerRequest if err := json.NewDecoder(s).Decode(&req); err != nil || req.QueryID == "" { return diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go index 260e768..31a439c 100644 --- a/daemons/node/indexer/service.go +++ b/daemons/node/indexer/service.go @@ -162,7 +162,7 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ // Build and send a HeartbeatResponse after each received node heartbeat. // Raw metrics only — no pre-cooked score. Node computes the score itself. - ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *common.HeartbeatResponse { + ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *common.HeartbeatResponse { ix.StreamMU.RLock() peerCount := len(ix.StreamRecords[common.ProtocolHeartbeat]) // Collect lastSeen per active peer for challenge responses. @@ -184,6 +184,16 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ } ix.StreamMU.RUnlock() + // AfterHeartbeat updates srec.Record asynchronously — it may not have run yet. + // Use rawRecord (the fresh signed PeerRecord embedded in the heartbeat) directly + // so referencedNodes always gets the current Name/DID regardless of timing. + if remotePeerRecord.Name == "" && len(rawRecord) > 0 { + var fresh PeerRecord + if json.Unmarshal(rawRecord, &fresh) == nil { + remotePeerRecord = fresh + } + } + // Update referent designation: node marks its best-scored indexer with Referent=true. ix.updateReferent(remotePeer, remotePeerRecord, referent) diff --git a/daemons/node/nats.go b/daemons/node/nats.go index 1ef905e..34e0d37 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -6,10 +6,10 @@ import ( "fmt" "oc-discovery/daemons/node/common" "oc-discovery/daemons/node/stream" + "slices" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" - "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -25,81 +25,17 @@ type executionConsidersPayload struct { func ListenNATS(n *Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ - /*tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) { - if resp.FromApp == config.GetAppName() { - return - } - if res, err := resources.ToResource(resp.Datatype.EnumIndex(), resp.Payload); err == nil { - access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - p := access.LoadOne(res.GetCreatorID()) - realP := p.ToPeer() - if realP == nil { - return - } else if realP.Relation == peer.SELF { - pubKey, err := common.PubKeyFromString(realP.PublicKey) // extract pubkey from pubkey str - if err != nil { - return - } - ok, _ := pubKey.Verify(resp.Payload, res.GetSignature()) - if b, err := json.Marshal(stream.Verify{ - IsVerified: ok, - }); err == nil { - tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Method: int(tools.VERIFY_RESOURCE), - Payload: b, - }) - } - } else if realP.Relation != peer.BLACKLIST { - n.StreamService.PublishVerifyResources(&resp.Datatype, resp.User, realP.PeerID, resp.Payload) - } - } - },*/ - tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { - if resp.FromApp == config.GetAppName() && resp.Datatype != tools.PEER && resp.Datatype != tools.WORKFLOW { - return - } - logger := oclib.GetLogger() - m := map[string]interface{}{} - err := json.Unmarshal(resp.Payload, &m) - if err != nil { - logger.Err(err) - return - } - p := &peer.Peer{} - p = p.Deserialize(m, p).(*peer.Peer) - - ad, err := pp.AddrInfoFromString(p.StreamAddress) - if err != nil { - return - } - // Non-partner: close any existing streams for this peer. - if p.Relation != peer.PARTNER { - n.StreamService.Mu.Lock() - defer n.StreamService.Mu.Unlock() - ps := common.ProtocolStream{} - for proto, s := range n.StreamService.Streams { - m := map[pp.ID]*common.Stream{} - for k := range s { - if ad.ID != k { - m[k] = s[k] - } else { - s[k].Stream.Close() - } - } - ps[proto] = m - } - n.StreamService.Streams = ps - } - - }, tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { fmt.Println("PROPALGATION") if resp.FromApp == config.GetAppName() { return } + p, err := oclib.GetMySelf() + if err != nil || p == nil || p.PeerID != n.PeerID.String() { + return + } var propalgation tools.PropalgationMessage - err := json.Unmarshal(resp.Payload, &propalgation) + err = json.Unmarshal(resp.Payload, &propalgation) var dt *tools.DataType if propalgation.DataType > 0 { dtt := tools.DataType(propalgation.DataType) @@ -114,30 +50,41 @@ func ListenNATS(n *Node) { if propalgation.Action == tools.PB_MINIO_CONFIG { proto = stream.ProtocolMinioConfigResource } - if err := json.Unmarshal(resp.Payload, &m); err == nil { + if err := json.Unmarshal(propalgation.Payload, &m); err == nil { peers, _ := n.GetPeerRecord(context.Background(), m.PeerID) for _, p := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, - p.PeerID, proto, resp.Payload) + p.PeerID, proto, propalgation.Payload) } } case tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE: - fmt.Println(n.StreamService.ToPartnerPublishEvent( - context.Background(), - propalgation.Action, - dt, resp.User, resp.Groups, - propalgation.Payload, - )) + if slices.Contains([]tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE}, resp.Datatype) { + m := map[string]interface{}{} + if err := json.Unmarshal(propalgation.Payload, &m); err == nil { + if m["peer_id"] != nil { + n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, + fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolCreateResource, propalgation.Payload) + } + } + } else { + fmt.Println(n.StreamService.ToPartnerPublishEvent( + context.Background(), + propalgation.Action, + dt, resp.User, resp.Groups, + propalgation.Payload, + )) + } case tools.PB_CONSIDERS: switch resp.Datatype { case tools.BOOKING, tools.PURCHASE_RESOURCE, tools.WORKFLOW_EXECUTION: var m executionConsidersPayload - if err := json.Unmarshal(resp.Payload, &m); err == nil { + if err := json.Unmarshal(propalgation.Payload, &m); err == nil { for _, p := range m.PeerIDs { peers, _ := n.GetPeerRecord(context.Background(), p) for _, pp := range peers { + fmt.Println(n.PeerID.String(), pp, string(propalgation.Payload)) n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, - pp.PeerID, stream.ProtocolConsidersResource, resp.Payload) + pp.PeerID, stream.ProtocolConsidersResource, propalgation.Payload) } } } @@ -156,10 +103,11 @@ func ListenNATS(n *Node) { } case tools.PB_PLANNER: m := map[string]interface{}{} - if err := json.Unmarshal(resp.Payload, &m); err == nil { + if err := json.Unmarshal(propalgation.Payload, &m); err == nil { + fmt.Println("PLAN", m) b := []byte{} if len(m) > 1 { - b = resp.Payload + b = propalgation.Payload } if m["peer_id"] == nil { // send to every active stream n.StreamService.Mu.Lock() @@ -168,10 +116,10 @@ func ListenNATS(n *Node) { n.StreamService.PublishCommon(nil, resp.User, resp.Groups, pid.String(), stream.ProtocolSendPlanner, b) } } + n.StreamService.Mu.Unlock() } else { n.StreamService.PublishCommon(nil, resp.User, resp.Groups, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b) } - n.StreamService.Mu.Unlock() } case tools.PB_CLOSE_PLANNER: m := map[string]interface{}{} @@ -212,6 +160,7 @@ func ListenNATS(n *Node) { } else { m := map[string]interface{}{} if err := json.Unmarshal(propalgation.Payload, &m); err == nil { + fmt.Println("SEARCH", dt, fmt.Sprintf("%v", m["type"]), fmt.Sprintf("%v", m["type"])) n.PubSubService.SearchPublishEvent( context.Background(), dt, diff --git a/daemons/node/node.go b/daemons/node/node.go index bb37531..41ebc6e 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -129,6 +129,37 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { if node.StreamService, err = stream.InitStream(context.Background(), node.Host, node.PeerID, 1000, node); err != nil { panic(err) } + node.StreamService.IsPeerKnown = func(pid pp.ID) bool { + // 1. Local DB: known peer (handles blacklist). + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + results := access.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}}, + }, + }, pid.String(), false) + for _, item := range results.Data { + p, ok := item.(*peer.Peer) + if !ok || p.PeerID != pid.String() { + continue + } + return p.Relation != peer.BLACKLIST + } + // 2. Ask a connected indexer → DHT lookup by peer_id. + for _, addr := range common.Indexers.GetAddrs() { + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + s, err := h.NewStream(ctx, addr.Info.ID, common.ProtocolGet) + cancel() + if err != nil { + continue + } + json.NewEncoder(s).Encode(indexer.GetValue{PeerID: pid.String()}) + var resp indexer.GetResponse + json.NewDecoder(s).Decode(&resp) + s.Reset() + return resp.Found + } + return false + } if node.PubSubService, err = pubsub.InitPubSub(context.Background(), node.Host, node.PS, node, node.StreamService); err != nil { panic(err) @@ -144,7 +175,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { } } logger.Info().Msg("subscribe to decentralized search flow...") - node.SubscribeToSearch(node.PS, &f) + go node.SubscribeToSearch(node.PS, &f) logger.Info().Msg("connect to NATS") go ListenNATS(node) logger.Info().Msg("Node is actually running.") @@ -201,7 +232,6 @@ func (d *Node) publishPeerRecord( // Results are pushed to onResult as they arrive; the function returns when // the stream closes (idle timeout, explicit cancel, or indexer unreachable). func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.SearchHit)) { - fmt.Println("SearchPeerRecord", needle) logger := oclib.GetLogger() idleTimeout := common.SearchIdleTimeout() @@ -240,6 +270,7 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea <-ctx.Done() s.SetReadDeadline(time.Now()) }() + seen := map[string]struct{}{} dec := json.NewDecoder(s) for { var result common.SearchPeerResult @@ -251,6 +282,14 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea } d.peerSearches.ResetIdle(searchKey) for _, hit := range result.Records { + key := hit.PeerID + if key == "" { + key = hit.DID + } + if _, already := seen[key]; already { + continue + } + seen[key] = struct{}{} onResult(hit) } } diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index 485a5a2..29a3cf5 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -86,6 +86,7 @@ func (abs *StreamService) verifyResponse(event *common.Event) error { // } func (abs *StreamService) sendPlanner(event *common.Event) error { // + fmt.Println("sendPlanner", len(event.Payload)) if len(event.Payload) == 0 { if plan, err := planner.GenerateShallow(&tools.APIRequest{Admin: true}); err == nil { if b, err := json.Marshal(plan); err == nil { @@ -166,8 +167,6 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri }, evt.From, false) if len(peers.Data) > 0 { p := peers.Data[0].(*peer.Peer) - fmt.Println(evt.From, p.GetID(), peers.Data) - ps.SendResponse(p, evt, fmt.Sprintf("%v", search)) } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID ps.SendResponse(p[0], evt, fmt.Sprintf("%v", search)) @@ -182,7 +181,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri }) } case ProtocolCreateResource, ProtocolUpdateResource: - fmt.Println("RECEIVED Protocol.Update") + fmt.Println("RECEIVED Protocol.Update", string(evt.Payload)) go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(evt.DataType), @@ -218,13 +217,10 @@ func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event, search } else { for _, dt := range dts { access := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil) - peerID := p.GetID() searched := access.Search(abs.FilterPeer(self.GetID(), event.Groups, search), "", false) - fmt.Println("SEND SEARCH_EVENT", self.GetID(), dt, len(searched.Data), peerID) for _, ss := range searched.Data { if j, err := json.Marshal(ss); err == nil { - _, err := abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j) - fmt.Println("Publish ERR", err) + abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j) } } } diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index a5115f9..db3d932 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -26,7 +26,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, groups for _, pes := range p.Data { for _, proto := range protos { if _, err := ps.PublishCommon(dt, user, groups, pes.(*peer.Peer).PeerID, proto, resource); err != nil { - return err + continue } } } @@ -57,7 +57,6 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, groups [ if err != nil { return nil, err } - fmt.Println("WRITE") return ps.write(toPeerID, ad, dt, user, resource, proto) } return nil, errors.New("peer unvalid " + toPeerID) @@ -131,21 +130,17 @@ func (s *StreamService) write( return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String()) } - if self, err := oclib.GetMySelf(); err != nil { - return nil, err - } else { - stream := s.Streams[proto][peerID.ID] - evt := common.NewEvent(string(proto), self.PeerID, dt, user, payload) - fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp) - if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil { - stream.Stream.Close() - logger.Err(err) - return nil, err - } - if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse { - go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true}) - } - return stream, nil - } + stream := s.Streams[proto][peerID.ID] + evt := common.NewEvent(string(proto), s.Host.ID().String(), dt, user, payload) + fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp) + if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil { + stream.Stream.Close() + logger.Err(err) + return nil, err + } + if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse { + go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true}) + } + return stream, nil } diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index d4a99cf..c1fd9a8 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -51,13 +51,16 @@ var protocolsPartners = map[protocol.ID]*common.ProtocolInfo{ } type StreamService struct { - Key pp.ID - Host host.Host - Node common.DiscoveryPeer - Streams common.ProtocolStream - maxNodesConn int - Mu sync.RWMutex + Key pp.ID + Host host.Host + Node common.DiscoveryPeer + Streams common.ProtocolStream + maxNodesConn int + Mu sync.RWMutex ResourceSearches *common.SearchTracker + // IsPeerKnown, when set, is called at stream open for every inbound protocol. + // Return false to reset the stream immediately. Left nil until wired by the node. + IsPeerKnown func(pid pp.ID) bool } func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) { @@ -71,7 +74,7 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c ResourceSearches: common.NewSearchTracker(), } for proto := range protocols { - service.Host.SetStreamHandler(proto, service.HandleResponse) + service.Host.SetStreamHandler(proto, service.gate(service.HandleResponse)) } logger.Info().Msg("connect to partners...") service.connectToPartners() // we set up a stream @@ -79,6 +82,21 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c return service, nil } +// gate wraps a stream handler with IsPeerKnown validation. +// If the peer is unknown the entire connection is closed and the handler is not called. +// IsPeerKnown is read at stream-open time so it works even when set after InitStream. +func (s *StreamService) gate(h func(network.Stream)) func(network.Stream) { + return func(stream network.Stream) { + if s.IsPeerKnown != nil && !s.IsPeerKnown(stream.Conn().RemotePeer()) { + logger := oclib.GetLogger() + logger.Warn().Str("peer", stream.Conn().RemotePeer().String()).Msg("[stream] unknown peer, closing connection") + stream.Conn().Close() + return + } + h(stream) + } +} + func (s *StreamService) HandleResponse(stream network.Stream) { s.Mu.Lock() defer s.Mu.Unlock() @@ -119,7 +137,7 @@ func (s *StreamService) connectToPartners() error { go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()], ss.Conn().RemotePeer(), proto, info) } logger.Info().Msg("SetStreamHandler " + string(proto)) - s.Host.SetStreamHandler(proto, f) + s.Host.SetStreamHandler(proto, s.gate(f)) } return nil } diff --git a/docker_discovery3.json b/docker_discovery3.json index 89649bc..0b62256 100644 --- a/docker_discovery3.json +++ b/docker_discovery3.json @@ -4,5 +4,6 @@ "NATS_URL": "nats://nats:4222", "NODE_MODE": "node", "NODE_ENDPOINT_PORT": 4003, + "NAME": "opencloud-demo-1", "INDEXER_ADDRESSES": "/ip4/172.40.0.2/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u" } \ No newline at end of file diff --git a/docker_discovery4.json b/docker_discovery4.json index e7edac1..d6e10cb 100644 --- a/docker_discovery4.json +++ b/docker_discovery4.json @@ -4,5 +4,6 @@ "NATS_URL": "nats://nats:4222", "NODE_MODE": "node", "NODE_ENDPOINT_PORT": 4004, + "NAME": "opencloud-demo-2", "INDEXER_ADDRESSES": "/ip4/172.40.0.1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu" }