package common import ( "encoding/json" "errors" "fmt" "io" "math/rand" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) type LongLivedStreamRecordedService[T interface{}] struct { *LongLivedPubSubService StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] StreamMU sync.RWMutex maxNodesConn int // AllowInbound, when set, is called once at stream open before any heartbeat // is decoded. remotePeer is the connecting peer; isNew is true when no // StreamRecord exists yet (first-ever connection). Return a non-nil error // to immediately reset the stream and refuse the peer. AllowInbound func(remotePeer pp.ID, isNew bool) error // ValidateHeartbeat, when set, is called inside the heartbeat loop after // each successful CheckHeartbeat decode. Return a non-nil error to reset // the stream and terminate the session. ValidateHeartbeat func(remotePeer pp.ID) error // AfterHeartbeat is called after each successful heartbeat with the full // decoded Heartbeat so the hook can use the fresh embedded PeerRecord. AfterHeartbeat func(hb *Heartbeat) // AfterDelete is called after gc() evicts an expired peer, outside the lock. // name and did may be empty if the HeartbeatStream had no metadata. AfterDelete func(pid pp.ID, name string, did string) // BuildHeartbeatResponse, when set, is called after each successfully decoded // heartbeat to build the response sent back to the node. // remotePeer is the peer that sent the heartbeat (used for offload routing). // need is how many more indexers the node wants (from hb.Need). // referent is true when the node designated this indexer as its search referent. BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *HeartbeatResponse } func (ix *LongLivedStreamRecordedService[T]) MaxNodesConn() int { return ix.maxNodesConn } func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int) *LongLivedStreamRecordedService[T] { service := &LongLivedStreamRecordedService[T]{ LongLivedPubSubService: NewLongLivedPubSubService(h), StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{}, maxNodesConn: maxNodesConn, } go service.StartGC(30 * time.Second) // Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned go service.Snapshot(1 * time.Hour) return service } func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) { go func() { t := time.NewTicker(interval) defer t.Stop() for range t.C { fmt.Println("ACTUALLY RELATED INDEXERS", Indexers.Addrs, len(Indexers.Addrs)) ix.gc() } }() } func (ix *LongLivedStreamRecordedService[T]) gc() { ix.StreamMU.Lock() now := time.Now().UTC() if ix.StreamRecords[ProtocolHeartbeat] == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} ix.StreamMU.Unlock() return } streams := ix.StreamRecords[ProtocolHeartbeat] type gcEntry struct { pid pp.ID name string did string } var evicted []gcEntry for pid, rec := range streams { if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.HeartbeatStream.UptimeTracker.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) { name, did := "", "" if rec.HeartbeatStream != nil { name = rec.HeartbeatStream.Name did = rec.HeartbeatStream.DID } evicted = append(evicted, gcEntry{pid, name, did}) for _, sstreams := range ix.StreamRecords { if sstreams[pid] != nil { delete(sstreams, pid) } } } } ix.StreamMU.Unlock() if ix.AfterDelete != nil { for _, e := range evicted { ix.AfterDelete(e.pid, e.name, e.did) } } } func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) { go func() { logger := oclib.GetLogger() t := time.NewTicker(interval) defer t.Stop() for range t.C { infos := ix.snapshot() for _, inf := range infos { logger.Info().Msg(" -> " + inf.DID) } } }() } // -------- Snapshot / Query -------- func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] { ix.StreamMU.Lock() defer ix.StreamMU.Unlock() out := make([]*StreamRecord[T], 0, len(ix.StreamRecords)) for _, streams := range ix.StreamRecords { for _, stream := range streams { out = append(out, stream) } } return out } func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) { logger := oclib.GetLogger() defer s.Close() // AllowInbound: burst guard + ban check before the first byte is read. if ix.AllowInbound != nil { remotePeer := s.Conn().RemotePeer() ix.StreamMU.RLock() _, exists := ix.StreamRecords[ProtocolHeartbeat][remotePeer] ix.StreamMU.RUnlock() if err := ix.AllowInbound(remotePeer, !exists); err != nil { logger.Warn().Err(err).Str("peer", remotePeer.String()).Msg("inbound connection refused") s.Reset() return } } dec := json.NewDecoder(s) for { ix.StreamMU.Lock() if ix.StreamRecords[ProtocolHeartbeat] == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} } streams := ix.StreamRecords[ProtocolHeartbeat] streamsAnonym := map[pp.ID]HeartBeatStreamed{} for k, v := range streams { streamsAnonym[k] = v } ix.StreamMU.Unlock() pid, hb, err := CheckHeartbeat(ix.Host, s, dec, streamsAnonym, &ix.StreamMU, ix.maxNodesConn) if err != nil { // Stream-level errors (EOF, reset, closed) mean the connection is gone // — exit so the goroutine doesn't spin forever on a dead stream. // Metric/policy errors (score too low, too many connections) are transient // — those are also stream-terminal since the stream carries one session. if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || strings.Contains(err.Error(), "reset") || strings.Contains(err.Error(), "closed") || strings.Contains(err.Error(), "too many connections") { logger.Info().Err(err).Msg("heartbeat stream terminated, closing handler") return } logger.Warn().Err(err).Msg("heartbeat check failed, retrying on same stream") continue } // ValidateHeartbeat: per-tick behavioral check (rate limiting, bans). if ix.ValidateHeartbeat != nil { if err := ix.ValidateHeartbeat(*pid); err != nil { logger.Warn().Err(err).Str("peer", pid.String()).Msg("heartbeat rejected, closing stream") s.Reset() return } } ix.StreamMU.Lock() // if record already seen update last seen if rec, ok := streams[*pid]; ok { rec.DID = hb.DID // Preserve the existing UptimeTracker so TotalOnline accumulates correctly. // hb.Stream is a fresh Stream with no UptimeTracker; carry the old one over. oldTracker := rec.GetUptimeTracker() rec.HeartbeatStream = hb.Stream if oldTracker != nil { rec.HeartbeatStream.UptimeTracker = oldTracker } else { rec.HeartbeatStream.UptimeTracker = &UptimeTracker{FirstSeen: time.Now().UTC()} } rec.HeartbeatStream.UptimeTracker.RecordHeartbeat() rec.LastScore = hb.Score logger.Info().Msg("A new node is updated : " + pid.String()) } else { tracker := &UptimeTracker{FirstSeen: time.Now().UTC()} tracker.RecordHeartbeat() hb.Stream.UptimeTracker = tracker streams[*pid] = &StreamRecord[T]{ DID: hb.DID, HeartbeatStream: hb.Stream, LastScore: hb.Score, } logger.Info().Msg("A new node is subscribed : " + pid.String()) } ix.StreamMU.Unlock() // Enrich hb.DID before calling the hook: nodes never set hb.DID directly; // extract it from the embedded signed PeerRecord if available, then fall // back to the DID stored by handleNodePublish in the stream record. if hb.DID == "" && len(hb.Record) > 0 { var partial struct { DID string `json:"did"` } if json.Unmarshal(hb.Record, &partial) == nil && partial.DID != "" { hb.DID = partial.DID } } if hb.DID == "" { ix.StreamMU.RLock() if rec, ok := streams[*pid]; ok { hb.DID = rec.DID } ix.StreamMU.RUnlock() } if ix.AfterHeartbeat != nil && hb.DID != "" { ix.AfterHeartbeat(hb) } // Send response back to the node (bidirectional heartbeat). if ix.BuildHeartbeatResponse != nil { if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent); resp != nil { s.SetWriteDeadline(time.Now().Add(3 * time.Second)) json.NewEncoder(s).Encode(resp) s.SetWriteDeadline(time.Time{}) } } } } func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams map[pp.ID]HeartBeatStreamed, lock *sync.RWMutex, maxNodes int) (*pp.ID, *Heartbeat, error) { if len(h.Network().Peers()) >= maxNodes { return nil, nil, fmt.Errorf("too many connections, try another indexer") } var hb Heartbeat if err := dec.Decode(&hb); err != nil { return nil, nil, err } _, bpms, latencyScore, _ := getBandwidthChallengeRate(h, s.Conn().RemotePeer(), MinPayloadChallenge+int(rand.Float64()*(MaxPayloadChallenge-MinPayloadChallenge))) { pid, err := pp.Decode(hb.PeerID) if err != nil { return nil, nil, err } uptimeRatio := float64(0) age := time.Duration(0) lock.Lock() if rec, ok := streams[pid]; ok && rec.GetUptimeTracker() != nil { uptimeRatio = rec.GetUptimeTracker().UptimeRatio() age = rec.GetUptimeTracker().Uptime() } lock.Unlock() // E: measure the indexer's own subnet diversity, not the node's view. diversity := getOwnDiversityRate(h) // fillRate: fraction of indexer capacity used — higher = more peers trust this indexer. fillRate := 0.0 if maxNodes > 0 { fillRate = float64(len(h.Network().Peers())) / float64(maxNodes) if fillRate > 1 { fillRate = 1 } } hb.ComputeIndexerScore(uptimeRatio, bpms, diversity, latencyScore, fillRate) // B: dynamic minScore — starts at 20% for brand-new peers, ramps to 80% at 24h. minScore := dynamicMinScore(age) if hb.Score < minScore { return nil, nil, errors.New("not enough trusting value") } hb.Stream = &Stream{ Name: hb.Name, DID: hb.DID, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } // here is the long-lived bidirectional heartbeat. return &pid, &hb, err } }