501 lines
16 KiB
Go
501 lines
16 KiB
Go
package indexer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"math/rand"
|
|
"oc-discovery/conf"
|
|
"oc-discovery/daemons/node/common"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
record "github.com/libp2p/go-libp2p-record"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
// dhtCacheEntry holds one indexer discovered via DHT for use in suggestion responses.
|
|
type dhtCacheEntry struct {
|
|
AI pp.AddrInfo
|
|
LastSeen time.Time
|
|
}
|
|
|
|
// offloadState tracks which nodes we've already proposed migration to.
|
|
// When an indexer is overloaded (fill rate > offloadThreshold) it only sends
|
|
// SuggestMigrate to a small batch at a time; peers that don't migrate within
|
|
// offloadGracePeriod are moved to alreadyTried so a new batch can be picked.
|
|
type offloadState struct {
|
|
inBatch map[pp.ID]time.Time // peer → time added to current batch
|
|
alreadyTried map[pp.ID]struct{} // peers proposed to that didn't migrate
|
|
mu sync.Mutex
|
|
}
|
|
|
|
const (
|
|
offloadThreshold = 0.80 // fill rate above which to start offloading
|
|
offloadBatchSize = 5 // max concurrent "please migrate" proposals
|
|
offloadGracePeriod = 3 * common.RecommendedHeartbeatInterval
|
|
)
|
|
|
|
// IndexerService manages the indexer node's state: stream records, DHT, pubsub.
|
|
type IndexerService struct {
|
|
*common.LongLivedStreamRecordedService[PeerRecord]
|
|
PS *pubsub.PubSub
|
|
DHT *dht.IpfsDHT
|
|
isStrictIndexer bool
|
|
mu sync.RWMutex
|
|
nameIndex *nameIndexState
|
|
dhtProvideCancel context.CancelFunc
|
|
bornAt time.Time
|
|
// Passive DHT cache: refreshed every 2 min in background, used for suggestions.
|
|
dhtCache []dhtCacheEntry
|
|
dhtCacheMu sync.RWMutex
|
|
// Offload state for overloaded-indexer migration proposals.
|
|
offload offloadState
|
|
// referencedNodes holds nodes that have designated this indexer as their
|
|
// search referent (Heartbeat.Referent=true). Used for distributed search.
|
|
referencedNodes map[pp.ID]PeerRecord
|
|
referencedNodesMu sync.RWMutex
|
|
// pendingSearches maps queryID → result channel for in-flight searches.
|
|
pendingSearches map[string]chan []common.SearchHit
|
|
pendingSearchesMu sync.Mutex
|
|
// behavior tracks per-node compliance (heartbeat rate, publish/get volume,
|
|
// identity consistency, signature failures).
|
|
behavior *NodeBehaviorTracker
|
|
// connGuard limits new-connection bursts to protect public indexers.
|
|
connGuard *ConnectionRateGuard
|
|
}
|
|
|
|
// NewIndexerService creates an IndexerService.
|
|
// If ps is nil, this is a strict indexer (no pre-existing gossip sub from a node).
|
|
func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerService {
|
|
logger := oclib.GetLogger()
|
|
logger.Info().Msg("open indexer mode...")
|
|
var err error
|
|
ix := &IndexerService{
|
|
LongLivedStreamRecordedService: common.NewStreamRecordedService[PeerRecord](h, maxNode),
|
|
isStrictIndexer: ps == nil,
|
|
referencedNodes: map[pp.ID]PeerRecord{},
|
|
pendingSearches: map[string]chan []common.SearchHit{},
|
|
behavior: newNodeBehaviorTracker(),
|
|
connGuard: newConnectionRateGuard(),
|
|
}
|
|
if ps == nil {
|
|
ps, err = pubsub.NewGossipSub(context.Background(), ix.Host)
|
|
if err != nil {
|
|
panic(err) // can't run your indexer without a propagation pubsub
|
|
}
|
|
}
|
|
ix.PS = ps
|
|
|
|
if ix.isStrictIndexer {
|
|
logger.Info().Msg("connect to indexers as strict indexer...")
|
|
common.ConnectToIndexers(h, conf.GetConfig().MinIndexer, conf.GetConfig().MaxIndexer*2)
|
|
logger.Info().Msg("subscribe to decentralized search flow as strict indexer...")
|
|
go ix.SubscribeToSearch(ix.PS, nil)
|
|
}
|
|
|
|
logger.Info().Msg("init distributed name index...")
|
|
ix.initNameIndex(ps)
|
|
ix.LongLivedStreamRecordedService.AfterDelete = func(pid pp.ID, name, did string) {
|
|
ix.publishNameEvent(NameIndexDelete, name, pid.String(), did)
|
|
// Remove behavior state for peers that are no longer connected and
|
|
// have no active ban — keeps memory bounded to the live node set.
|
|
ix.behavior.Cleanup(pid)
|
|
}
|
|
|
|
// AllowInbound: fired once per stream open, before any heartbeat is decoded.
|
|
// 1. Reject peers that are currently banned (behavioral strikes).
|
|
// 2. For genuinely new connections, apply the burst guard.
|
|
ix.AllowInbound = func(remotePeer pp.ID, isNew bool) error {
|
|
if ix.behavior.IsBanned(remotePeer) {
|
|
return errors.New("peer is banned")
|
|
}
|
|
if isNew && !ix.connGuard.Allow() {
|
|
return errors.New("connection rate limit exceeded, retry later")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ValidateHeartbeat: fired on every heartbeat tick for an established stream.
|
|
// Checks heartbeat cadence — rejects if the node is sending too fast.
|
|
ix.ValidateHeartbeat = func(remotePeer pp.ID) error {
|
|
return ix.behavior.RecordHeartbeat(remotePeer)
|
|
}
|
|
|
|
// Parse bootstrap peers from configured indexer addresses so the DHT can
|
|
// find its routing table entries even in a fresh deployment.
|
|
var bootstrapPeers []pp.AddrInfo
|
|
for _, addrStr := range strings.Split(conf.GetConfig().IndexerAddresses, ",") {
|
|
addrStr = strings.TrimSpace(addrStr)
|
|
if addrStr == "" {
|
|
continue
|
|
}
|
|
if ad, err := pp.AddrInfoFromString(addrStr); err == nil {
|
|
bootstrapPeers = append(bootstrapPeers, *ad)
|
|
}
|
|
}
|
|
dhtOpts := []dht.Option{
|
|
dht.Mode(dht.ModeServer),
|
|
dht.ProtocolPrefix("oc"),
|
|
dht.Validator(record.NamespacedValidator{
|
|
"node": PeerRecordValidator{},
|
|
"name": DefaultValidator{},
|
|
"pid": DefaultValidator{},
|
|
}),
|
|
}
|
|
if len(bootstrapPeers) > 0 {
|
|
dhtOpts = append(dhtOpts, dht.BootstrapPeers(bootstrapPeers...))
|
|
}
|
|
if ix.DHT, err = dht.New(context.Background(), ix.Host, dhtOpts...); err != nil {
|
|
logger.Info().Msg(err.Error())
|
|
return nil
|
|
}
|
|
|
|
// Make the DHT available for replenishment from other packages.
|
|
common.SetDiscoveryDHT(ix.DHT)
|
|
|
|
ix.bornAt = time.Now().UTC()
|
|
ix.offload.inBatch = make(map[pp.ID]time.Time)
|
|
ix.offload.alreadyTried = make(map[pp.ID]struct{})
|
|
ix.initNodeHandler()
|
|
|
|
// Build and send a HeartbeatResponse after each received node heartbeat.
|
|
// Raw metrics only — no pre-cooked score. Node computes the score itself.
|
|
ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *common.HeartbeatResponse {
|
|
ix.StreamMU.RLock()
|
|
peerCount := len(ix.StreamRecords[common.ProtocolHeartbeat])
|
|
// Collect lastSeen per active peer for challenge responses.
|
|
type peerMeta struct {
|
|
found bool
|
|
lastSeen time.Time
|
|
}
|
|
peerLookup := make(map[string]peerMeta, peerCount)
|
|
var remotePeerRecord PeerRecord
|
|
for pid, rec := range ix.StreamRecords[common.ProtocolHeartbeat] {
|
|
var ls time.Time
|
|
if rec.HeartbeatStream != nil && rec.HeartbeatStream.UptimeTracker != nil {
|
|
ls = rec.HeartbeatStream.UptimeTracker.LastSeen
|
|
}
|
|
peerLookup[pid.String()] = peerMeta{found: true, lastSeen: ls}
|
|
if pid == remotePeer {
|
|
remotePeerRecord = rec.Record
|
|
}
|
|
}
|
|
ix.StreamMU.RUnlock()
|
|
|
|
// Update referent designation: node marks its best-scored indexer with Referent=true.
|
|
ix.updateReferent(remotePeer, remotePeerRecord, referent)
|
|
|
|
maxN := ix.MaxNodesConn()
|
|
fillRate := 0.0
|
|
if maxN > 0 {
|
|
fillRate = float64(peerCount) / float64(maxN)
|
|
if fillRate > 1 {
|
|
fillRate = 1
|
|
}
|
|
}
|
|
|
|
resp := &common.HeartbeatResponse{
|
|
FillRate: fillRate,
|
|
PeerCount: peerCount,
|
|
MaxNodes: maxN,
|
|
BornAt: ix.bornAt,
|
|
}
|
|
|
|
// Answer each challenged PeerID with raw found + lastSeen.
|
|
for _, pidStr := range challenges {
|
|
meta := peerLookup[pidStr] // zero value if not found
|
|
entry := common.ChallengeEntry{
|
|
PeerID: pidStr,
|
|
Found: meta.found,
|
|
LastSeen: meta.lastSeen,
|
|
}
|
|
resp.Challenges = append(resp.Challenges, entry)
|
|
}
|
|
|
|
// DHT challenge: retrieve the node's own DID to prove DHT is functional.
|
|
if challengeDID != "" {
|
|
ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second)
|
|
val, err := ix.DHT.GetValue(ctx3, "/node/"+challengeDID)
|
|
cancel3()
|
|
resp.DHTFound = err == nil
|
|
if err == nil {
|
|
resp.DHTPayload = json.RawMessage(val)
|
|
}
|
|
}
|
|
|
|
// Random sample of connected nodes as witnesses (up to 3).
|
|
// Never include the requesting peer itself — asking a node to witness
|
|
// itself is circular and meaningless.
|
|
ix.StreamMU.RLock()
|
|
for pidStr := range peerLookup {
|
|
if len(resp.Witnesses) >= 3 {
|
|
break
|
|
}
|
|
pid, err := pp.Decode(pidStr)
|
|
if err != nil || pid == remotePeer || pid == ix.Host.ID() {
|
|
continue
|
|
}
|
|
addrs := ix.Host.Peerstore().Addrs(pid)
|
|
ai := common.FilterLoopbackAddrs(pp.AddrInfo{ID: pid, Addrs: addrs})
|
|
if len(ai.Addrs) > 0 {
|
|
resp.Witnesses = append(resp.Witnesses, ai)
|
|
}
|
|
}
|
|
ix.StreamMU.RUnlock()
|
|
|
|
// Attach suggestions: exactly `need` entries from the DHT cache.
|
|
// If the indexer is overloaded (SuggestMigrate will be set below), always
|
|
// provide at least 1 suggestion even when need == 0, so the node has
|
|
// somewhere to go.
|
|
suggestionsNeeded := need
|
|
if fillRate > offloadThreshold && suggestionsNeeded < 1 {
|
|
suggestionsNeeded = 1
|
|
}
|
|
if suggestionsNeeded > 0 {
|
|
ix.dhtCacheMu.RLock()
|
|
// When offloading, pick from a random offset within the top N of the
|
|
// cache so concurrent migrations spread across multiple targets rather
|
|
// than all rushing to the same least-loaded indexer (thundering herd).
|
|
// For normal need-based suggestions the full sorted order is fine.
|
|
cache := ix.dhtCache
|
|
if fillRate > offloadThreshold && len(cache) > suggestionsNeeded {
|
|
const spreadWindow = 5 // sample from the top-5 least-loaded
|
|
window := spreadWindow
|
|
if window > len(cache) {
|
|
window = len(cache)
|
|
}
|
|
start := rand.Intn(window)
|
|
cache = cache[start:]
|
|
}
|
|
for _, e := range cache {
|
|
if len(resp.Suggestions) >= suggestionsNeeded {
|
|
break
|
|
}
|
|
// Never suggest the requesting peer itself or this indexer.
|
|
if e.AI.ID == remotePeer || e.AI.ID == h.ID() {
|
|
continue
|
|
}
|
|
resp.Suggestions = append(resp.Suggestions, e.AI)
|
|
}
|
|
ix.dhtCacheMu.RUnlock()
|
|
}
|
|
|
|
// Offload logic: when fill rate is too high, selectively ask nodes to migrate.
|
|
if fillRate > offloadThreshold && len(resp.Suggestions) > 0 {
|
|
now := time.Now()
|
|
ix.offload.mu.Lock()
|
|
// Expire stale batch entries -> move to alreadyTried.
|
|
for pid, addedAt := range ix.offload.inBatch {
|
|
if now.Sub(addedAt) > offloadGracePeriod {
|
|
ix.offload.alreadyTried[pid] = struct{}{}
|
|
delete(ix.offload.inBatch, pid)
|
|
}
|
|
}
|
|
// Reset alreadyTried if we've exhausted the whole pool.
|
|
if len(ix.offload.alreadyTried) >= peerCount {
|
|
ix.offload.alreadyTried = make(map[pp.ID]struct{})
|
|
}
|
|
_, tried := ix.offload.alreadyTried[remotePeer]
|
|
_, inBatch := ix.offload.inBatch[remotePeer]
|
|
if !tried {
|
|
if inBatch {
|
|
resp.SuggestMigrate = true
|
|
} else if len(ix.offload.inBatch) < offloadBatchSize {
|
|
ix.offload.inBatch[remotePeer] = now
|
|
resp.SuggestMigrate = true
|
|
}
|
|
}
|
|
ix.offload.mu.Unlock()
|
|
} else if fillRate <= offloadThreshold {
|
|
// Fill rate back to normal: reset offload state.
|
|
ix.offload.mu.Lock()
|
|
if len(ix.offload.inBatch) > 0 || len(ix.offload.alreadyTried) > 0 {
|
|
ix.offload.inBatch = make(map[pp.ID]time.Time)
|
|
ix.offload.alreadyTried = make(map[pp.ID]struct{})
|
|
}
|
|
ix.offload.mu.Unlock()
|
|
}
|
|
|
|
// Bootstrap: if this indexer has no indexers of its own, probe the
|
|
// connecting peer to check it supports ProtocolHeartbeat (i.e. it is
|
|
// itself an indexer). Plain nodes do not register the handler and the
|
|
// negotiation fails instantly — no wasted heartbeat cycle.
|
|
// Run in a goroutine: the probe is a short blocking stream open.
|
|
if len(common.Indexers.GetAddrs()) == 0 && remotePeer != h.ID() {
|
|
pid := remotePeer
|
|
go func() {
|
|
if !common.SupportsHeartbeat(h, pid) {
|
|
logger.Debug().Str("peer", pid.String()).
|
|
Msg("[bootstrap] inbound peer has no heartbeat handler — not an indexer, skipping")
|
|
return
|
|
}
|
|
addrs := h.Peerstore().Addrs(pid)
|
|
ai := common.FilterLoopbackAddrs(pp.AddrInfo{ID: pid, Addrs: addrs})
|
|
if len(ai.Addrs) == 0 {
|
|
return
|
|
}
|
|
key := pid.String()
|
|
if !common.Indexers.ExistsAddr(key) {
|
|
adCopy := ai
|
|
common.Indexers.SetAddr(key, &adCopy)
|
|
common.Indexers.NudgeIt()
|
|
logger.Info().Str("peer", key).Msg("[bootstrap] no indexers — added inbound indexer peer as candidate")
|
|
}
|
|
}()
|
|
}
|
|
|
|
return resp
|
|
}
|
|
|
|
// Advertise this indexer in the DHT so nodes can discover it.
|
|
fillRateFn := func() float64 {
|
|
ix.StreamMU.RLock()
|
|
n := len(ix.StreamRecords[common.ProtocolHeartbeat])
|
|
ix.StreamMU.RUnlock()
|
|
maxN := ix.MaxNodesConn()
|
|
if maxN <= 0 {
|
|
return 0
|
|
}
|
|
rate := float64(n) / float64(maxN)
|
|
if rate > 1 {
|
|
rate = 1
|
|
}
|
|
return rate
|
|
}
|
|
ix.startDHTCacheRefresh()
|
|
ix.startDHTProvide(fillRateFn)
|
|
|
|
return ix
|
|
}
|
|
|
|
// startDHTCacheRefresh periodically queries the DHT for peer indexers and
|
|
// refreshes ix.dhtCache. This passive cache is used by BuildHeartbeatResponse
|
|
// to suggest better indexers to connected nodes without any per-request cost.
|
|
func (ix *IndexerService) startDHTCacheRefresh() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
// Store cancel alongside the provide cancel so Close() stops both.
|
|
prevCancel := ix.dhtProvideCancel
|
|
ix.dhtProvideCancel = func() {
|
|
if prevCancel != nil {
|
|
prevCancel()
|
|
}
|
|
cancel()
|
|
}
|
|
go func() {
|
|
logger := oclib.GetLogger()
|
|
refresh := func() {
|
|
if ix.DHT == nil {
|
|
return
|
|
}
|
|
// Fetch more than needed so SelectByFillRate can filter for diversity.
|
|
raw := common.DiscoverIndexersFromDHT(ix.Host, ix.DHT, 30)
|
|
if len(raw) == 0 {
|
|
return
|
|
}
|
|
// Remove self before selection.
|
|
filtered := raw[:0]
|
|
for _, ai := range raw {
|
|
if ai.ID != ix.Host.ID() {
|
|
filtered = append(filtered, ai)
|
|
}
|
|
}
|
|
// SelectByFillRate applies /24 subnet diversity and fill-rate weighting.
|
|
// Fill rates are unknown at this stage (nil map) so all peers get
|
|
// the neutral prior f=0.5 — diversity filtering still applies.
|
|
selected := common.SelectByFillRate(filtered, nil, 10)
|
|
now := time.Now()
|
|
ix.dhtCacheMu.Lock()
|
|
ix.dhtCache = ix.dhtCache[:0]
|
|
for _, ai := range selected {
|
|
ix.dhtCache = append(ix.dhtCache, dhtCacheEntry{AI: ai, LastSeen: now})
|
|
}
|
|
ix.dhtCacheMu.Unlock()
|
|
logger.Info().Int("cached", len(selected)).Msg("[dht] indexer suggestion cache refreshed")
|
|
}
|
|
// Initial delay: let the DHT routing table warm up first.
|
|
select {
|
|
case <-time.After(30 * time.Second):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
refresh()
|
|
t := time.NewTicker(2 * time.Minute)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
refresh()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// startDHTProvide bootstraps the DHT and starts a goroutine that periodically
|
|
// advertises this indexer under the well-known provider key.
|
|
func (ix *IndexerService) startDHTProvide(fillRateFn func() float64) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ix.dhtProvideCancel = cancel
|
|
go func() {
|
|
logger := oclib.GetLogger()
|
|
// Wait until a routable (non-loopback) address is available.
|
|
for i := 0; i < 12; i++ {
|
|
addrs := ix.Host.Addrs()
|
|
if len(addrs) > 0 && !strings.Contains(addrs[len(addrs)-1].String(), "127.0.0.1") {
|
|
break
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
if err := ix.DHT.Bootstrap(ctx); err != nil {
|
|
logger.Warn().Err(err).Msg("[dht] bootstrap failed")
|
|
}
|
|
provide := func() {
|
|
pCtx, pCancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer pCancel()
|
|
if err := ix.DHT.Provide(pCtx, common.IndexerCID(), true); err != nil {
|
|
logger.Warn().Err(err).Msg("[dht] Provide failed")
|
|
} else {
|
|
logger.Info().Float64("fill_rate", fillRateFn()).Msg("[dht] indexer advertised in DHT")
|
|
}
|
|
}
|
|
provide()
|
|
t := time.NewTicker(common.RecommendedHeartbeatInterval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
provide()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (ix *IndexerService) Close() {
|
|
if ix.dhtProvideCancel != nil {
|
|
ix.dhtProvideCancel()
|
|
}
|
|
ix.DHT.Close()
|
|
ix.PS.UnregisterTopicValidator(common.TopicPubSubSearch)
|
|
if ix.nameIndex != nil {
|
|
ix.PS.UnregisterTopicValidator(TopicNameIndex)
|
|
}
|
|
for _, s := range ix.StreamRecords {
|
|
for _, ss := range s {
|
|
ss.HeartbeatStream.Stream.Close()
|
|
}
|
|
}
|
|
}
|