package common import ( "errors" "sync" "time" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) type Score struct { FirstContacted time.Time UptimeTracker *UptimeTracker LastFillRate float64 Score float64 // IsSeed marks indexers that came from the IndexerAddresses static config. // Seeds are sticky: they are never evicted by the score threshold alone. // A seed is only removed when: (a) heartbeat fails, or (b) it sends // SuggestMigrate and the node already has MinIndexer non-seed alternatives. IsSeed bool // challenge bookkeeping (2-3 peers per batch, raw data returned by indexer) hbCount int // heartbeats sent since last challenge batch nextChallenge int // send challenges when hbCount reaches this (rand 1-10) challengeTotal int // number of own-PeerID challenges sent (ground truth) challengeCorrect int // own PeerID found AND lastSeen within 2×interval // fill rate consistency: cross-check reported fillRate vs peerCount/maxNodes fillChecked int fillConsistent int // BornAt stability LastBornAt time.Time bornAtChanges int // DHT challenge dhtChecked int dhtSuccess int dhtBatchCounter int // Peer witnesses witnessChecked int witnessConsistent int } // computeNodeSideScore computes the node's quality assessment of an indexer from raw metrics. // All ratios are in [0,1]; result is in [0,100]. // - uptimeRatio : gap-aware fraction of lifetime the indexer was reachable // - challengeAccuracy: own-PeerID challenges answered correctly (found + recent lastSeen) // - latencyScore : 1 - RTT/maxRTT, clamped [0,1] // - fillScore : 1 - fillRate — prefer less-loaded indexers // - fillConsistency : fraction of ticks where peerCount/maxNodes ≈ fillRate (±10%) func (s *Score) ComputeNodeSideScore(latencyScore float64) float64 { uptime := s.UptimeTracker.UptimeRatio() challengeAccuracy := 1.0 if s.challengeTotal > 0 { challengeAccuracy = float64(s.challengeCorrect) / float64(s.challengeTotal) } fillScore := 1.0 - s.LastFillRate fillConsistency := 1.0 if s.fillChecked > 0 { fillConsistency = float64(s.fillConsistent) / float64(s.fillChecked) } witnessConsistency := 1.0 if s.witnessChecked > 0 { witnessConsistency = float64(s.witnessConsistent) / float64(s.witnessChecked) } dhtSuccessRate := 1.0 if s.dhtChecked > 0 { dhtSuccessRate = float64(s.dhtSuccess) / float64(s.dhtChecked) } base := ((0.20 * uptime) + (0.20 * challengeAccuracy) + (0.15 * latencyScore) + (0.10 * fillScore) + (0.10 * fillConsistency) + (0.15 * witnessConsistency) + (0.10 * dhtSuccessRate)) * 100 // BornAt stability: each unexpected BornAt change penalises by 30%. bornAtPenalty := 1.0 - 0.30*float64(s.bornAtChanges) if bornAtPenalty < 0 { bornAtPenalty = 0 } return base * bornAtPenalty } type Directory struct { MuAddr sync.RWMutex MuScore sync.RWMutex MuStream sync.RWMutex Addrs map[string]*pp.AddrInfo Scores map[string]*Score Nudge chan struct{} Streams ProtocolStream } func (d *Directory) ExistsScore(a string) bool { d.MuScore.RLock() defer d.MuScore.RUnlock() for addr, ai := range d.Scores { if ai != nil && (a == addr) { return true } } return false } func (d *Directory) GetScore(a string) *Score { d.MuScore.RLock() defer d.MuScore.RUnlock() for addr, s := range d.Scores { if s != nil && (a == addr) { sCopy := *s return &sCopy } } return nil } func (d *Directory) GetScores() map[string]*Score { d.MuScore.RLock() defer d.MuScore.RUnlock() score := map[string]*Score{} for addr, s := range d.Scores { score[addr] = s } return score } func (d *Directory) DeleteScore(a string) { d.MuScore.RLock() defer d.MuScore.RUnlock() score := map[string]*Score{} for addr, s := range d.Scores { if a != addr { score[addr] = s } } d.Scores = score } func (d *Directory) SetScore(addr string, score *Score) *pp.AddrInfo { d.MuScore.Lock() defer d.MuScore.Unlock() d.Scores[addr] = score return nil } func (d *Directory) ExistsAddr(addrOrId string) bool { d.MuAddr.RLock() defer d.MuAddr.RUnlock() for addr, ai := range d.Addrs { if ai != nil && (addrOrId == ai.ID.String() || addrOrId == addr) { return true } } return false } func (d *Directory) GetAddr(addrOrId string) *pp.AddrInfo { d.MuAddr.RLock() defer d.MuAddr.RUnlock() for addr, ai := range d.Addrs { if ai != nil && (addrOrId == ai.ID.String() || addrOrId == addr) { aiCopy := *ai return &aiCopy } } return nil } func (d *Directory) DeleteAddr(a string) { d.MuAddr.RLock() defer d.MuAddr.RUnlock() addrs := map[string]*pp.AddrInfo{} for addr, s := range d.Addrs { if a != addr { addrs[addr] = s } } d.Addrs = addrs } func (d *Directory) SetAddr(addr string, info *pp.AddrInfo) *pp.AddrInfo { d.MuAddr.Lock() defer d.MuAddr.Unlock() d.Addrs[addr] = info return nil } func (d *Directory) GetAddrIDs() []pp.ID { d.MuAddr.RLock() defer d.MuAddr.RUnlock() indexers := make([]pp.ID, 0, len(d.Addrs)) for _, ai := range d.Addrs { if ai != nil { indexers = append(indexers, ai.ID) } } return Shuffle(indexers) } func (d *Directory) GetAddrsStr() []string { d.MuAddr.RLock() defer d.MuAddr.RUnlock() indexers := make([]string, 0, len(d.Addrs)) for s, ai := range d.Addrs { if ai != nil { indexers = append(indexers, s) } } return Shuffle(indexers) } type Entry struct { Addr string Info *pp.AddrInfo } func (d *Directory) GetAddrs() []Entry { d.MuAddr.RLock() defer d.MuAddr.RUnlock() indexers := make([]Entry, 0, len(d.Addrs)) for addr, ai := range d.Addrs { if ai != nil { indexers = append(indexers, Entry{ Addr: addr, Info: ai, }) } } return Shuffle(indexers) } // NudgeIndexerHeartbeat signals the indexer heartbeat goroutine to fire immediately. func (d *Directory) NudgeIt() { select { case d.Nudge <- struct{}{}: default: // nudge already pending, skip } } type ProtocolStream map[protocol.ID]map[pp.ID]*Stream func (ps ProtocolStream) Get(protocol protocol.ID) map[pp.ID]*Stream { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } return ps[protocol] } func (ps ProtocolStream) GetPerID(protocol protocol.ID, peerID pp.ID) *Stream { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } return ps[protocol][peerID] } func (ps ProtocolStream) Add(protocol protocol.ID, peerID *pp.ID, s *Stream) error { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } if peerID != nil { if s != nil { ps[protocol][*peerID] = s } else { return errors.New("unable to add stream : stream missing") } } return nil } func (ps ProtocolStream) Delete(protocol protocol.ID, peerID *pp.ID) { if streams, ok := ps[protocol]; ok { if peerID != nil && streams[*peerID] != nil && streams[*peerID].Stream != nil { streams[*peerID].Stream.Close() delete(streams, *peerID) } else { for _, s := range ps { for _, v := range s { if v.Stream != nil { v.Stream.Close() } } } delete(ps, protocol) } } } var Indexers = &Directory{ Addrs: map[string]*pp.AddrInfo{}, Scores: map[string]*Score{}, Nudge: make(chan struct{}, 1), Streams: ProtocolStream{}, }