diff --git a/daemons/node/common/search_tracker.go b/daemons/node/common/search_tracker.go new file mode 100644 index 0000000..7b0a830 --- /dev/null +++ b/daemons/node/common/search_tracker.go @@ -0,0 +1,94 @@ +package common + +import ( + "context" + "oc-discovery/conf" + "strings" + "sync" + "time" + + "github.com/google/uuid" +) + +// SearchIdleTimeout returns the configured search idle timeout (default 5s). +func SearchIdleTimeout() time.Duration { + if t := conf.GetConfig().SearchTimeout; t > 0 { + return time.Duration(t) * time.Second + } + return 5 * time.Second +} + +// searchEntry holds the lifecycle state for one active search. +type searchEntry struct { + cancel context.CancelFunc + timer *time.Timer + idleTimeout time.Duration +} + +// SearchTracker tracks one active search per user (peer or resource). +// Each search is keyed by a composite "user:searchID" so that a replaced +// search's late-arriving results can be told apart from the current one. +// +// Typical usage: +// +// ctx, cancel := context.WithCancel(parent) +// key := tracker.Register(userKey, cancel, idleTimeout) +// defer tracker.Cancel(key) +// // ... on each result: tracker.ResetIdle(key) + tracker.IsActive(key) +type SearchTracker struct { + mu sync.Mutex + entries map[string]*searchEntry +} + +func NewSearchTracker() *SearchTracker { + return &SearchTracker{entries: map[string]*searchEntry{}} +} + +// Register starts a new search for baseUser, cancelling any previous one. +// Returns the composite key "baseUser:searchID" to be used as the search identifier. +func (t *SearchTracker) Register(baseUser string, cancel context.CancelFunc, idleTimeout time.Duration) string { + compositeKey := baseUser + ":" + uuid.New().String() + t.mu.Lock() + t.cancelByPrefix(baseUser) + e := &searchEntry{cancel: cancel, idleTimeout: idleTimeout} + e.timer = time.AfterFunc(idleTimeout, func() { t.Cancel(compositeKey) }) + t.entries[compositeKey] = e + t.mu.Unlock() + return compositeKey +} + +// Cancel cancels the search(es) matching user (bare user key or composite key). +func (t *SearchTracker) Cancel(user string) { + t.mu.Lock() + t.cancelByPrefix(user) + t.mu.Unlock() +} + +// ResetIdle resets the idle timer for compositeKey after a response arrives. +func (t *SearchTracker) ResetIdle(compositeKey string) { + t.mu.Lock() + if e, ok := t.entries[compositeKey]; ok { + e.timer.Reset(e.idleTimeout) + } + t.mu.Unlock() +} + +// IsActive returns true if compositeKey is still the current active search. +func (t *SearchTracker) IsActive(compositeKey string) bool { + t.mu.Lock() + _, ok := t.entries[compositeKey] + t.mu.Unlock() + return ok +} + +// cancelByPrefix cancels all entries whose key equals user or starts with "user:". +// Must be called with t.mu held. +func (t *SearchTracker) cancelByPrefix(user string) { + for k, e := range t.entries { + if k == user || strings.HasPrefix(k, user+":") { + e.timer.Stop() + e.cancel() + delete(t.entries, k) + } + } +} diff --git a/daemons/node/nats.go b/daemons/node/nats.go index 969293e..1ef905e 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -105,7 +105,7 @@ func ListenNATS(n *Node) { dtt := tools.DataType(propalgation.DataType) dt = &dtt } - fmt.Println("PROPALGATION ACT", propalgation.Action, propalgation.Action == tools.PB_CREATE, err) + fmt.Println("PROPALGATION ACT", propalgation.DataType, propalgation.Action, propalgation.Action == tools.PB_CREATE, err) if err == nil { switch propalgation.Action { case tools.PB_ADMIRALTY_CONFIG, tools.PB_MINIO_CONFIG: @@ -122,7 +122,6 @@ func ListenNATS(n *Node) { } } case tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE: - fmt.Println(propalgation.Action, dt, resp.User, propalgation.Payload) fmt.Println(n.StreamService.ToPartnerPublishEvent( context.Background(), propalgation.Action, @@ -186,6 +185,12 @@ func ListenNATS(n *Node) { } n.StreamService.Mu.Unlock() } + case tools.PB_CLOSE_SEARCH: + if propalgation.DataType == int(tools.PEER) { + n.peerSearches.Cancel(resp.User) + } else { + n.StreamService.ResourceSearches.Cancel(resp.User) + } case tools.PB_SEARCH: if propalgation.DataType == int(tools.PEER) { m := map[string]interface{}{} diff --git a/daemons/node/node.go b/daemons/node/node.go index c8967ab..bb37531 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -27,12 +27,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/security/noise" ) -// activeSearch tracks an in-flight distributed peer search for one user. -type activeSearch struct { - queryID string - cancel context.CancelFunc -} - type Node struct { *common.LongLivedStreamRecordedService[interface{}] // change type of stream PS *pubsubs.PubSub @@ -43,9 +37,8 @@ type Node struct { isIndexer bool peerRecord *indexer.PeerRecord - // activeSearches: one streaming search per user; new search cancels previous. - activeSearchesMu sync.Mutex - activeSearches map[string]*activeSearch + // peerSearches: one active peer search per user; new search cancels previous. + peerSearches *common.SearchTracker Mu sync.RWMutex } @@ -85,7 +78,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { PeerID: h.ID(), isIndexer: isIndexer, LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000), - activeSearches: map[string]*activeSearch{}, + peerSearches: common.NewSearchTracker(), } // Register the bandwidth probe handler so any peer measuring this node's // throughput can open a dedicated probe stream and read the echo. @@ -204,32 +197,21 @@ func (d *Node) publishPeerRecord( } // SearchPeerRecord starts a distributed peer search via ProtocolSearchPeer. -// userKey identifies the requesting user — a new call cancels any previous -// search for the same user. Results are pushed to onResult as they arrive. -// The function returns when the search stream closes (idle timeout or indexer unreachable). +// A new call for the same userKey cancels any previous search. +// Results are pushed to onResult as they arrive; the function returns when +// the stream closes (idle timeout, explicit cancel, or indexer unreachable). func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.SearchHit)) { + fmt.Println("SearchPeerRecord", needle) logger := oclib.GetLogger() + idleTimeout := common.SearchIdleTimeout() ctx, cancel := context.WithCancel(context.Background()) + // Register cancels any previous search for userKey and starts the idle timer. + // The composite key doubles as QueryID so the indexer echoes it back. + searchKey := d.peerSearches.Register(userKey, cancel, idleTimeout) + defer d.peerSearches.Cancel(searchKey) - d.activeSearchesMu.Lock() - if prev, ok := d.activeSearches[userKey]; ok { - prev.cancel() - } - queryID := uuid.New().String() - d.activeSearches[userKey] = &activeSearch{queryID: queryID, cancel: cancel} - d.activeSearchesMu.Unlock() - - defer func() { - cancel() - d.activeSearchesMu.Lock() - if cur, ok := d.activeSearches[userKey]; ok && cur.queryID == queryID { - delete(d.activeSearches, userKey) - } - d.activeSearchesMu.Unlock() - }() - - req := common.SearchPeerRequest{QueryID: queryID} + req := common.SearchPeerRequest{QueryID: searchKey} if pid, err := pp.Decode(needle); err == nil { req.PeerID = pid.String() } else if _, err := uuid.Parse(needle); err == nil { @@ -238,7 +220,6 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea req.Name = needle } - // Try indexers in pool order until one accepts the stream. for _, ad := range common.Indexers.GetAddrs() { if ad.Info == nil { continue @@ -253,15 +234,22 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea s.Reset() continue } + // Interrupt the blocking Decode as soon as the context is cancelled + // (idle timer, explicit PB_CLOSE_SEARCH, or replacement search). + go func() { + <-ctx.Done() + s.SetReadDeadline(time.Now()) + }() dec := json.NewDecoder(s) for { var result common.SearchPeerResult if err := dec.Decode(&result); err != nil { break } - if result.QueryID != queryID { - continue // stale response from a previous query + if result.QueryID != searchKey || !d.peerSearches.IsActive(searchKey) { + break } + d.peerSearches.ResetIdle(searchKey) for _, hit := range result.Records { onResult(hit) } diff --git a/daemons/node/pubsub/publish.go b/daemons/node/pubsub/publish.go index 817dd28..e820570 100644 --- a/daemons/node/pubsub/publish.go +++ b/daemons/node/pubsub/publish.go @@ -4,9 +4,11 @@ import ( "context" "encoding/json" "errors" + "oc-discovery/conf" "oc-discovery/daemons/node/common" "oc-discovery/daemons/node/stream" "oc-discovery/models" + "time" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -33,7 +35,18 @@ func (ps *PubSubService) SearchPublishEvent( if err != nil { return err } - return ps.publishEvent(ctx, dt, tools.PB_SEARCH, common.TopicPubSubSearch, user, b) + idleTimeout := func() time.Duration { + if t := conf.GetConfig().SearchTimeout; t > 0 { + return time.Duration(t) * time.Second + } + return 5 * time.Second + }() + searchCtx, cancel := context.WithCancel(ctx) + // Register cancels any previous search for this user and starts the idle timer. + // The returned composite key is used as User in the GossipSub event so that + // remote peers echo it back unchanged, allowing IsActive to validate results. + searchKey := ps.StreamService.ResourceSearches.Register(user, cancel, idleTimeout) + return ps.publishEvent(searchCtx, dt, tools.PB_SEARCH, common.TopicPubSubSearch, searchKey, b) default: return errors.New("no type of research found") } diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index 8fcd438..485a5a2 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -114,10 +114,15 @@ func (abs *StreamService) sendPlanner(event *common.Event) error { // } func (abs *StreamService) retrieveResponse(event *common.Event) error { // + if !abs.ResourceSearches.IsActive(event.User) { + return nil // search already closed or timed out + } res, err := resources.ToResource(int(event.DataType), event.Payload) if err != nil || res == nil { return nil } + // A response arrived — reset the idle timeout. + abs.ResourceSearches.ResetIdle(event.User) b, err := json.Marshal(res.Serialize(res)) go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 7ce16e6..d4a99cf 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -51,22 +51,24 @@ var protocolsPartners = map[protocol.ID]*common.ProtocolInfo{ } type StreamService struct { - Key pp.ID - Host host.Host - Node common.DiscoveryPeer - Streams common.ProtocolStream - maxNodesConn int - Mu sync.RWMutex + Key pp.ID + Host host.Host + Node common.DiscoveryPeer + Streams common.ProtocolStream + maxNodesConn int + Mu sync.RWMutex + ResourceSearches *common.SearchTracker } func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) { logger := oclib.GetLogger() service := &StreamService{ - Key: key, - Node: node, - Host: h, - Streams: common.ProtocolStream{}, - maxNodesConn: maxNode, + Key: key, + Node: node, + Host: h, + Streams: common.ProtocolStream{}, + maxNodesConn: maxNode, + ResourceSearches: common.NewSearchTracker(), } for proto := range protocols { service.Host.SetStreamHandler(proto, service.HandleResponse) diff --git a/go.mod b/go.mod index f85603d..afa7e56 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-discovery go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260311072518-933b7147e908 + cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 github.com/ipfs/go-cid v0.6.0 github.com/libp2p/go-libp2p v0.47.0 github.com/libp2p/go-libp2p-record v0.3.1 diff --git a/go.sum b/go.sum index 35f902e..61ebdd9 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa h1:1wCpI4dwN1pj6 cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260311072518-933b7147e908 h1:1jz3xI/u2FzCG8phY7ShqADrmCj0mlrdjbdNUosSwgs= cloud.o-forge.io/core/oc-lib v0.0.0-20260311072518-933b7147e908/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6iicxIydrjfCNU+UHhKuklFgfzDa174= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 h1:DuB6SDThFVJVQ0iI0pZnBqtCE0uW+SNI7R7ndKixu2k= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=