package indexer import ( "context" "encoding/json" "strings" "time" "oc-discovery/conf" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/network" ) const TopicSearchPeer = "oc-search-peer" // searchTimeout returns the configured search timeout, defaulting to 5s. func searchTimeout() time.Duration { if t := conf.GetConfig().SearchTimeout; t > 0 { return time.Duration(t) * time.Second } return 5 * time.Second } // initSearchHandlers registers ProtocolSearchPeer and ProtocolSearchPeerResponse // and subscribes to TopicSearchPeer on GossipSub. func (ix *IndexerService) initSearchHandlers() { ix.Host.SetStreamHandler(common.ProtocolSearchPeer, ix.handleSearchPeer) ix.Host.SetStreamHandler(common.ProtocolSearchPeerResponse, ix.handleSearchPeerResponse) ix.initSearchSubscription() } // updateReferent is called from HandleHeartbeat when Referent flag changes. // If referent=true the node is added to referencedNodes; if false it is removed. func (ix *IndexerService) updateReferent(pid pp.ID, rec PeerRecord, referent bool) { ix.referencedNodesMu.Lock() defer ix.referencedNodesMu.Unlock() if referent { ix.referencedNodes[pid] = rec } else { delete(ix.referencedNodes, pid) } } // searchReferenced looks up nodes in referencedNodes matching the query. // Matches on peerID (exact), DID (exact), or name (case-insensitive contains). func (ix *IndexerService) searchReferenced(peerID, did, name string) []common.SearchHit { ix.referencedNodesMu.RLock() defer ix.referencedNodesMu.RUnlock() nameLow := strings.ToLower(name) var hits []common.SearchHit for pid, rec := range ix.referencedNodes { pidStr := pid.String() matchPeerID := peerID != "" && pidStr == peerID matchDID := did != "" && rec.DID == did matchName := name != "" && strings.Contains(strings.ToLower(rec.Name), nameLow) if matchPeerID || matchDID || matchName { hits = append(hits, common.SearchHit{ PeerID: pidStr, DID: rec.DID, Name: rec.Name, }) } } return hits } // handleSearchPeer is the ProtocolSearchPeer handler. // The node opens this stream, sends a SearchPeerRequest, and reads results // as they stream in. The stream stays open until timeout or node closes it. func (ix *IndexerService) handleSearchPeer(s network.Stream) { logger := oclib.GetLogger() defer s.Reset() var req common.SearchPeerRequest if err := json.NewDecoder(s).Decode(&req); err != nil || req.QueryID == "" { return } // streamCtx is cancelled when the node closes its end of the stream. streamCtx, streamCancel := context.WithCancel(context.Background()) go func() { // Block until the stream is reset/closed, then cancel our context. buf := make([]byte, 1) s.Read(buf) //nolint:errcheck — we only care about EOF/reset streamCancel() }() defer streamCancel() resultCh := make(chan []common.SearchHit, 16) ix.pendingSearchesMu.Lock() ix.pendingSearches[req.QueryID] = resultCh ix.pendingSearchesMu.Unlock() defer func() { ix.pendingSearchesMu.Lock() delete(ix.pendingSearches, req.QueryID) ix.pendingSearchesMu.Unlock() }() // Check own referencedNodes immediately. if hits := ix.searchReferenced(req.PeerID, req.DID, req.Name); len(hits) > 0 { resultCh <- hits } // Broadcast search on GossipSub so other indexers can respond. ix.publishSearchQuery(req.QueryID, req.PeerID, req.DID, req.Name) // Stream results back to node as they arrive; reset idle timer on each result. enc := json.NewEncoder(s) idleTimer := time.NewTimer(searchTimeout()) defer idleTimer.Stop() for { select { case hits := <-resultCh: if err := enc.Encode(common.SearchPeerResult{QueryID: req.QueryID, Records: hits}); err != nil { logger.Debug().Err(err).Msg("[search] stream write failed") return } // Reset idle timeout: keep alive as long as results trickle in. if !idleTimer.Stop() { select { case <-idleTimer.C: default: } } idleTimer.Reset(searchTimeout()) case <-idleTimer.C: // No new result within timeout — close gracefully. return case <-streamCtx.Done(): // Node closed the stream (new search superseded this one). return } } } // handleSearchPeerResponse is the ProtocolSearchPeerResponse handler. // Another indexer opens this stream to deliver hits for a pending queryID. func (ix *IndexerService) handleSearchPeerResponse(s network.Stream) { defer s.Reset() var result common.SearchPeerResult if err := json.NewDecoder(s).Decode(&result); err != nil || result.QueryID == "" { return } ix.pendingSearchesMu.Lock() ch := ix.pendingSearches[result.QueryID] ix.pendingSearchesMu.Unlock() if ch != nil { select { case ch <- result.Records: default: // channel full, drop — node may be slow } } } // publishSearchQuery broadcasts a SearchQuery on TopicSearchPeer. func (ix *IndexerService) publishSearchQuery(queryID, peerID, did, name string) { ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.RLock() topic := ix.LongLivedStreamRecordedService.LongLivedPubSubService.LongLivedPubSubs[TopicSearchPeer] ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.RUnlock() if topic == nil { return } q := common.SearchQuery{ QueryID: queryID, PeerID: peerID, DID: did, Name: name, EmitterID: ix.Host.ID().String(), } b, err := json.Marshal(q) if err != nil { return } _ = topic.Publish(context.Background(), b) } // initSearchSubscription joins TopicSearchPeer and dispatches incoming queries. func (ix *IndexerService) initSearchSubscription() { logger := oclib.GetLogger() ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Lock() topic, err := ix.PS.Join(TopicSearchPeer) if err != nil { ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Unlock() logger.Err(err).Msg("[search] failed to join search topic") return } ix.LongLivedStreamRecordedService.LongLivedPubSubService.LongLivedPubSubs[TopicSearchPeer] = topic ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Unlock() common.SubscribeEvents( ix.LongLivedStreamRecordedService.LongLivedPubSubService, context.Background(), TopicSearchPeer, -1, func(_ context.Context, q common.SearchQuery, _ string) { ix.onSearchQuery(q) }, ) } // onSearchQuery handles an incoming GossipSub search broadcast. // If we have matching referencedNodes, we respond to the emitting indexer. func (ix *IndexerService) onSearchQuery(q common.SearchQuery) { // Don't respond to our own broadcasts. if q.EmitterID == ix.Host.ID().String() { return } hits := ix.searchReferenced(q.PeerID, q.DID, q.Name) if len(hits) == 0 { return } emitterID, err := pp.Decode(q.EmitterID) if err != nil { return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() s, err := ix.Host.NewStream(ctx, emitterID, common.ProtocolSearchPeerResponse) if err != nil { return } defer s.Reset() s.SetDeadline(time.Now().Add(5 * time.Second)) json.NewEncoder(s).Encode(common.SearchPeerResult{QueryID: q.QueryID, Records: hits}) }