package indexer import ( "context" "encoding/json" "errors" "math/rand" "oc-discovery/conf" "oc-discovery/daemons/node/common" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" record "github.com/libp2p/go-libp2p-record" "github.com/libp2p/go-libp2p/core/host" pp "github.com/libp2p/go-libp2p/core/peer" ) // dhtCacheEntry holds one indexer discovered via DHT for use in suggestion responses. type dhtCacheEntry struct { AI pp.AddrInfo LastSeen time.Time } // offloadState tracks which nodes we've already proposed migration to. // When an indexer is overloaded (fill rate > offloadThreshold) it only sends // SuggestMigrate to a small batch at a time; peers that don't migrate within // offloadGracePeriod are moved to alreadyTried so a new batch can be picked. type offloadState struct { inBatch map[pp.ID]time.Time // peer → time added to current batch alreadyTried map[pp.ID]struct{} // peers proposed to that didn't migrate mu sync.Mutex } const ( offloadThreshold = 0.80 // fill rate above which to start offloading offloadBatchSize = 5 // max concurrent "please migrate" proposals offloadGracePeriod = 3 * common.RecommendedHeartbeatInterval ) // IndexerService manages the indexer node's state: stream records, DHT, pubsub. type IndexerService struct { *common.LongLivedStreamRecordedService[PeerRecord] PS *pubsub.PubSub DHT *dht.IpfsDHT isStrictIndexer bool mu sync.RWMutex dhtProvideCancel context.CancelFunc bornAt time.Time // Passive DHT cache: refreshed every 2 min in background, used for suggestions. dhtCache []dhtCacheEntry dhtCacheMu sync.RWMutex // Offload state for overloaded-indexer migration proposals. offload offloadState // referencedNodes holds nodes that have designated this indexer as their // search referent (Heartbeat.Referent=true). Used for distributed search. referencedNodes map[pp.ID]PeerRecord referencedNodesMu sync.RWMutex // pendingSearches maps queryID → result channel for in-flight searches. pendingSearches map[string]chan []common.SearchHit pendingSearchesMu sync.Mutex // behavior tracks per-node compliance (heartbeat rate, publish/get volume, // identity consistency, signature failures). behavior *NodeBehaviorTracker // connGuard limits new-connection bursts to protect public indexers. connGuard *ConnectionRateGuard } // NewIndexerService creates an IndexerService. // If ps is nil, this is a strict indexer (no pre-existing gossip sub from a node). func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerService { logger := oclib.GetLogger() logger.Info().Msg("open indexer mode...") var err error ix := &IndexerService{ LongLivedStreamRecordedService: common.NewStreamRecordedService[PeerRecord](h, maxNode), isStrictIndexer: ps == nil, referencedNodes: map[pp.ID]PeerRecord{}, pendingSearches: map[string]chan []common.SearchHit{}, behavior: newNodeBehaviorTracker(), connGuard: newConnectionRateGuard(), } if ps == nil { ps, err = pubsub.NewGossipSub(context.Background(), ix.Host) if err != nil { panic(err) // can't run your indexer without a propagation pubsub } } ix.PS = ps if ix.isStrictIndexer { logger.Info().Msg("connect to indexers as strict indexer...") common.ConnectToIndexers(h, conf.GetConfig().MinIndexer, conf.GetConfig().MaxIndexer*2) logger.Info().Msg("subscribe to decentralized search flow as strict indexer...") go ix.SubscribeToSearch(ix.PS, nil) } ix.LongLivedStreamRecordedService.AfterDelete = func(pid pp.ID, name, did string) { // Remove behavior state for peers that are no longer connected and // have no active ban — keeps memory bounded to the live node set. ix.behavior.Cleanup(pid) } // AllowInbound: fired once per stream open, before any heartbeat is decoded. // 1. Reject peers that are currently banned (behavioral strikes). // 2. For genuinely new connections, apply the burst guard. ix.AllowInbound = func(remotePeer pp.ID, isNew bool) error { if ix.behavior.IsBanned(remotePeer) { return errors.New("peer is banned") } if isNew && !ix.connGuard.Allow() { return errors.New("connection rate limit exceeded, retry later") } return nil } // ValidateHeartbeat: fired on every heartbeat tick for an established stream. // Checks heartbeat cadence — rejects if the node is sending too fast. ix.ValidateHeartbeat = func(remotePeer pp.ID) error { return ix.behavior.RecordHeartbeat(remotePeer) } // Parse bootstrap peers from configured indexer addresses so the DHT can // find its routing table entries even in a fresh deployment. var bootstrapPeers []pp.AddrInfo for _, addrStr := range strings.Split(conf.GetConfig().IndexerAddresses, ",") { addrStr = strings.TrimSpace(addrStr) if addrStr == "" { continue } if ad, err := pp.AddrInfoFromString(addrStr); err == nil { bootstrapPeers = append(bootstrapPeers, *ad) } } dhtOpts := []dht.Option{ dht.Mode(dht.ModeServer), dht.ProtocolPrefix("oc"), dht.Validator(record.NamespacedValidator{ "node": PeerRecordValidator{}, "name": DefaultValidator{}, "pid": DefaultValidator{}, }), } if len(bootstrapPeers) > 0 { dhtOpts = append(dhtOpts, dht.BootstrapPeers(bootstrapPeers...)) } if ix.DHT, err = dht.New(context.Background(), ix.Host, dhtOpts...); err != nil { logger.Info().Msg(err.Error()) return nil } // Make the DHT available for replenishment from other packages. common.SetDiscoveryDHT(ix.DHT) ix.bornAt = time.Now().UTC() ix.offload.inBatch = make(map[pp.ID]time.Time) ix.offload.alreadyTried = make(map[pp.ID]struct{}) ix.initNodeHandler() // Build and send a HeartbeatResponse after each received node heartbeat. // Raw metrics only — no pre-cooked score. Node computes the score itself. ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *common.HeartbeatResponse { ix.StreamMU.RLock() peerCount := len(ix.StreamRecords[common.ProtocolHeartbeat]) // Collect lastSeen per active peer for challenge responses. type peerMeta struct { found bool lastSeen time.Time } peerLookup := make(map[string]peerMeta, peerCount) var remotePeerRecord PeerRecord for pid, rec := range ix.StreamRecords[common.ProtocolHeartbeat] { var ls time.Time if rec.HeartbeatStream != nil && rec.HeartbeatStream.UptimeTracker != nil { ls = rec.HeartbeatStream.UptimeTracker.LastSeen } peerLookup[pid.String()] = peerMeta{found: true, lastSeen: ls} if pid == remotePeer { remotePeerRecord = rec.Record } } ix.StreamMU.RUnlock() // AfterHeartbeat updates srec.Record asynchronously — it may not have run yet. // Use rawRecord (the fresh signed PeerRecord embedded in the heartbeat) directly // so referencedNodes always gets the current Name/DID regardless of timing. if remotePeerRecord.Name == "" && len(rawRecord) > 0 { var fresh PeerRecord if json.Unmarshal(rawRecord, &fresh) == nil { remotePeerRecord = fresh } } // Update referent designation: node marks its best-scored indexer with Referent=true. ix.updateReferent(remotePeer, remotePeerRecord, referent) maxN := ix.MaxNodesConn() fillRate := 0.0 if maxN > 0 { fillRate = float64(peerCount) / float64(maxN) if fillRate > 1 { fillRate = 1 } } resp := &common.HeartbeatResponse{ FillRate: fillRate, PeerCount: peerCount, MaxNodes: maxN, BornAt: ix.bornAt, } // Answer each challenged PeerID with raw found + lastSeen. for _, pidStr := range challenges { meta := peerLookup[pidStr] // zero value if not found entry := common.ChallengeEntry{ PeerID: pidStr, Found: meta.found, LastSeen: meta.lastSeen, } resp.Challenges = append(resp.Challenges, entry) } // DHT challenge: retrieve the node's own DID to prove DHT is functional. if challengeDID != "" { ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second) val, err := ix.DHT.GetValue(ctx3, "/node/"+challengeDID) cancel3() resp.DHTFound = err == nil if err == nil { resp.DHTPayload = json.RawMessage(val) } } // Random sample of connected nodes as witnesses (up to 3). // Never include the requesting peer itself — asking a node to witness // itself is circular and meaningless. ix.StreamMU.RLock() for pidStr := range peerLookup { if len(resp.Witnesses) >= 3 { break } pid, err := pp.Decode(pidStr) if err != nil || pid == remotePeer || pid == ix.Host.ID() { continue } addrs := ix.Host.Peerstore().Addrs(pid) ai := common.FilterLoopbackAddrs(pp.AddrInfo{ID: pid, Addrs: addrs}) if len(ai.Addrs) > 0 { resp.Witnesses = append(resp.Witnesses, ai) } } ix.StreamMU.RUnlock() // Attach suggestions: exactly `need` entries from the DHT cache. // If the indexer is overloaded (SuggestMigrate will be set below), always // provide at least 1 suggestion even when need == 0, so the node has // somewhere to go. suggestionsNeeded := need if fillRate > offloadThreshold && suggestionsNeeded < 1 { suggestionsNeeded = 1 } if suggestionsNeeded > 0 { ix.dhtCacheMu.RLock() // When offloading, pick from a random offset within the top N of the // cache so concurrent migrations spread across multiple targets rather // than all rushing to the same least-loaded indexer (thundering herd). // For normal need-based suggestions the full sorted order is fine. cache := ix.dhtCache if fillRate > offloadThreshold && len(cache) > suggestionsNeeded { const spreadWindow = 5 // sample from the top-5 least-loaded window := spreadWindow if window > len(cache) { window = len(cache) } start := rand.Intn(window) cache = cache[start:] } for _, e := range cache { if len(resp.Suggestions) >= suggestionsNeeded { break } // Never suggest the requesting peer itself or this indexer. if e.AI.ID == remotePeer || e.AI.ID == h.ID() { continue } resp.Suggestions = append(resp.Suggestions, e.AI) } ix.dhtCacheMu.RUnlock() } // Offload logic: when fill rate is too high, selectively ask nodes to migrate. if fillRate > offloadThreshold && len(resp.Suggestions) > 0 { now := time.Now() ix.offload.mu.Lock() // Expire stale batch entries -> move to alreadyTried. for pid, addedAt := range ix.offload.inBatch { if now.Sub(addedAt) > offloadGracePeriod { ix.offload.alreadyTried[pid] = struct{}{} delete(ix.offload.inBatch, pid) } } // Reset alreadyTried if we've exhausted the whole pool. if len(ix.offload.alreadyTried) >= peerCount { ix.offload.alreadyTried = make(map[pp.ID]struct{}) } _, tried := ix.offload.alreadyTried[remotePeer] _, inBatch := ix.offload.inBatch[remotePeer] if !tried { if inBatch { resp.SuggestMigrate = true } else if len(ix.offload.inBatch) < offloadBatchSize { ix.offload.inBatch[remotePeer] = now resp.SuggestMigrate = true } } ix.offload.mu.Unlock() } else if fillRate <= offloadThreshold { // Fill rate back to normal: reset offload state. ix.offload.mu.Lock() if len(ix.offload.inBatch) > 0 || len(ix.offload.alreadyTried) > 0 { ix.offload.inBatch = make(map[pp.ID]time.Time) ix.offload.alreadyTried = make(map[pp.ID]struct{}) } ix.offload.mu.Unlock() } // Bootstrap: if this indexer has no indexers of its own, probe the // connecting peer to check it supports ProtocolHeartbeat (i.e. it is // itself an indexer). Plain nodes do not register the handler and the // negotiation fails instantly — no wasted heartbeat cycle. // Run in a goroutine: the probe is a short blocking stream open. if len(common.Indexers.GetAddrs()) == 0 && remotePeer != h.ID() { pid := remotePeer go func() { if !common.SupportsHeartbeat(h, pid) { logger.Debug().Str("peer", pid.String()). Msg("[bootstrap] inbound peer has no heartbeat handler — not an indexer, skipping") return } addrs := h.Peerstore().Addrs(pid) ai := common.FilterLoopbackAddrs(pp.AddrInfo{ID: pid, Addrs: addrs}) if len(ai.Addrs) == 0 { return } key := pid.String() if !common.Indexers.ExistsAddr(key) { adCopy := ai common.Indexers.SetAddr(key, &adCopy) common.Indexers.NudgeIt() logger.Info().Str("peer", key).Msg("[bootstrap] no indexers — added inbound indexer peer as candidate") } }() } return resp } // Advertise this indexer in the DHT so nodes can discover it. fillRateFn := func() float64 { ix.StreamMU.RLock() n := len(ix.StreamRecords[common.ProtocolHeartbeat]) ix.StreamMU.RUnlock() maxN := ix.MaxNodesConn() if maxN <= 0 { return 0 } rate := float64(n) / float64(maxN) if rate > 1 { rate = 1 } return rate } ix.startDHTCacheRefresh() ix.startDHTProvide(fillRateFn) return ix } // startDHTCacheRefresh periodically queries the DHT for peer indexers and // refreshes ix.dhtCache. This passive cache is used by BuildHeartbeatResponse // to suggest better indexers to connected nodes without any per-request cost. func (ix *IndexerService) startDHTCacheRefresh() { ctx, cancel := context.WithCancel(context.Background()) // Store cancel alongside the provide cancel so Close() stops both. prevCancel := ix.dhtProvideCancel ix.dhtProvideCancel = func() { if prevCancel != nil { prevCancel() } cancel() } go func() { logger := oclib.GetLogger() refresh := func() { if ix.DHT == nil { return } // Fetch more than needed so SelectByFillRate can filter for diversity. raw := common.DiscoverIndexersFromDHT(ix.Host, ix.DHT, 30) if len(raw) == 0 { return } // Remove self before selection. filtered := raw[:0] for _, ai := range raw { if ai.ID != ix.Host.ID() { filtered = append(filtered, ai) } } // SelectByFillRate applies /24 subnet diversity and fill-rate weighting. // Fill rates are unknown at this stage (nil map) so all peers get // the neutral prior f=0.5 — diversity filtering still applies. selected := common.SelectByFillRate(filtered, nil, 10) now := time.Now() ix.dhtCacheMu.Lock() ix.dhtCache = ix.dhtCache[:0] for _, ai := range selected { ix.dhtCache = append(ix.dhtCache, dhtCacheEntry{AI: ai, LastSeen: now}) } ix.dhtCacheMu.Unlock() logger.Info().Int("cached", len(selected)).Msg("[dht] indexer suggestion cache refreshed") } // Initial delay: let the DHT routing table warm up first. select { case <-time.After(30 * time.Second): case <-ctx.Done(): return } refresh() t := time.NewTicker(2 * time.Minute) defer t.Stop() for { select { case <-t.C: refresh() case <-ctx.Done(): return } } }() } // startDHTProvide bootstraps the DHT and starts a goroutine that periodically // advertises this indexer under the well-known provider key. func (ix *IndexerService) startDHTProvide(fillRateFn func() float64) { ctx, cancel := context.WithCancel(context.Background()) ix.dhtProvideCancel = cancel go func() { logger := oclib.GetLogger() // Wait until a routable (non-loopback) address is available. for i := 0; i < 12; i++ { addrs := ix.Host.Addrs() if len(addrs) > 0 && !strings.Contains(addrs[len(addrs)-1].String(), "127.0.0.1") { break } select { case <-ctx.Done(): return case <-time.After(5 * time.Second): } } if err := ix.DHT.Bootstrap(ctx); err != nil { logger.Warn().Err(err).Msg("[dht] bootstrap failed") } provide := func() { pCtx, pCancel := context.WithTimeout(ctx, 30*time.Second) defer pCancel() if err := ix.DHT.Provide(pCtx, common.IndexerCID(), true); err != nil { logger.Warn().Err(err).Msg("[dht] Provide failed") } else { logger.Info().Float64("fill_rate", fillRateFn()).Msg("[dht] indexer advertised in DHT") } } provide() t := time.NewTicker(common.RecommendedHeartbeatInterval) defer t.Stop() for { select { case <-t.C: provide() case <-ctx.Done(): return } } }() } func (ix *IndexerService) Close() { if ix.dhtProvideCancel != nil { ix.dhtProvideCancel() } ix.DHT.Close() ix.PS.UnregisterTopicValidator(common.TopicPubSubSearch) for _, s := range ix.StreamRecords { for _, ss := range s { ss.HeartbeatStream.Stream.Close() } } }