306 lines
10 KiB
Go
306 lines
10 KiB
Go
package common
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
)
|
|
|
|
type LongLivedStreamRecordedService[T interface{}] struct {
|
|
*LongLivedPubSubService
|
|
StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T]
|
|
StreamMU sync.RWMutex
|
|
maxNodesConn int
|
|
// AllowInbound, when set, is called once at stream open before any heartbeat
|
|
// is decoded. remotePeer is the connecting peer; isNew is true when no
|
|
// StreamRecord exists yet (first-ever connection). Return a non-nil error
|
|
// to immediately reset the stream and refuse the peer.
|
|
AllowInbound func(remotePeer pp.ID, isNew bool) error
|
|
// ValidateHeartbeat, when set, is called inside the heartbeat loop after
|
|
// each successful CheckHeartbeat decode. Return a non-nil error to reset
|
|
// the stream and terminate the session.
|
|
ValidateHeartbeat func(remotePeer pp.ID) error
|
|
// AfterHeartbeat is called after each successful heartbeat with the full
|
|
// decoded Heartbeat so the hook can use the fresh embedded PeerRecord.
|
|
AfterHeartbeat func(hb *Heartbeat)
|
|
// AfterDelete is called after gc() evicts an expired peer, outside the lock.
|
|
// name and did may be empty if the HeartbeatStream had no metadata.
|
|
AfterDelete func(pid pp.ID, name string, did string)
|
|
// BuildHeartbeatResponse, when set, is called after each successfully decoded
|
|
// heartbeat to build the response sent back to the node.
|
|
// remotePeer is the peer that sent the heartbeat (used for offload routing).
|
|
// need is how many more indexers the node wants (from hb.Need).
|
|
// referent is true when the node designated this indexer as its search referent.
|
|
// rawRecord is the fresh signed PeerRecord embedded in the heartbeat (hb.Record),
|
|
// passed directly so the handler does not race with AfterHeartbeat goroutine
|
|
// updating StreamRecord.Record.
|
|
BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *HeartbeatResponse
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) MaxNodesConn() int {
|
|
return ix.maxNodesConn
|
|
}
|
|
|
|
func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int) *LongLivedStreamRecordedService[T] {
|
|
service := &LongLivedStreamRecordedService[T]{
|
|
LongLivedPubSubService: NewLongLivedPubSubService(h),
|
|
StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{},
|
|
maxNodesConn: maxNodesConn,
|
|
}
|
|
go service.StartGC(30 * time.Second)
|
|
// Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned
|
|
go service.Snapshot(1 * time.Hour)
|
|
return service
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) {
|
|
go func() {
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for range t.C {
|
|
fmt.Println("ACTUALLY RELATED INDEXERS", Indexers.Addrs, len(Indexers.Addrs))
|
|
ix.gc()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) gc() {
|
|
ix.StreamMU.Lock()
|
|
now := time.Now().UTC()
|
|
if ix.StreamRecords[ProtocolHeartbeat] == nil {
|
|
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
|
|
ix.StreamMU.Unlock()
|
|
return
|
|
}
|
|
streams := ix.StreamRecords[ProtocolHeartbeat]
|
|
|
|
type gcEntry struct {
|
|
pid pp.ID
|
|
name string
|
|
did string
|
|
}
|
|
var evicted []gcEntry
|
|
for pid, rec := range streams {
|
|
if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.HeartbeatStream.UptimeTracker.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) {
|
|
name, did := "", ""
|
|
if rec.HeartbeatStream != nil {
|
|
name = rec.HeartbeatStream.Name
|
|
did = rec.HeartbeatStream.DID
|
|
}
|
|
evicted = append(evicted, gcEntry{pid, name, did})
|
|
for _, sstreams := range ix.StreamRecords {
|
|
if sstreams[pid] != nil {
|
|
delete(sstreams, pid)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
ix.StreamMU.Unlock()
|
|
|
|
if ix.AfterDelete != nil {
|
|
for _, e := range evicted {
|
|
ix.AfterDelete(e.pid, e.name, e.did)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) {
|
|
go func() {
|
|
logger := oclib.GetLogger()
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for range t.C {
|
|
infos := ix.snapshot()
|
|
for _, inf := range infos {
|
|
logger.Info().Msg(" -> " + inf.DID)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// -------- Snapshot / Query --------
|
|
func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] {
|
|
ix.StreamMU.Lock()
|
|
defer ix.StreamMU.Unlock()
|
|
|
|
out := make([]*StreamRecord[T], 0, len(ix.StreamRecords))
|
|
for _, streams := range ix.StreamRecords {
|
|
for _, stream := range streams {
|
|
out = append(out, stream)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) {
|
|
logger := oclib.GetLogger()
|
|
defer s.Close()
|
|
|
|
// AllowInbound: burst guard + ban check before the first byte is read.
|
|
if ix.AllowInbound != nil {
|
|
remotePeer := s.Conn().RemotePeer()
|
|
ix.StreamMU.RLock()
|
|
_, exists := ix.StreamRecords[ProtocolHeartbeat][remotePeer]
|
|
ix.StreamMU.RUnlock()
|
|
if err := ix.AllowInbound(remotePeer, !exists); err != nil {
|
|
logger.Warn().Err(err).Str("peer", remotePeer.String()).Msg("inbound connection refused")
|
|
s.Reset()
|
|
return
|
|
}
|
|
}
|
|
|
|
dec := json.NewDecoder(s)
|
|
for {
|
|
ix.StreamMU.Lock()
|
|
if ix.StreamRecords[ProtocolHeartbeat] == nil {
|
|
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
|
|
}
|
|
streams := ix.StreamRecords[ProtocolHeartbeat]
|
|
streamsAnonym := map[pp.ID]HeartBeatStreamed{}
|
|
for k, v := range streams {
|
|
streamsAnonym[k] = v
|
|
}
|
|
ix.StreamMU.Unlock()
|
|
pid, hb, err := CheckHeartbeat(ix.Host, s, dec, streamsAnonym, &ix.StreamMU, ix.maxNodesConn)
|
|
if err != nil {
|
|
// Stream-level errors (EOF, reset, closed) mean the connection is gone
|
|
// — exit so the goroutine doesn't spin forever on a dead stream.
|
|
// Metric/policy errors (score too low, too many connections) are transient
|
|
// — those are also stream-terminal since the stream carries one session.
|
|
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
|
|
strings.Contains(err.Error(), "reset") ||
|
|
strings.Contains(err.Error(), "closed") ||
|
|
strings.Contains(err.Error(), "too many connections") {
|
|
logger.Info().Err(err).Msg("heartbeat stream terminated, closing handler")
|
|
return
|
|
}
|
|
logger.Warn().Err(err).Msg("heartbeat check failed, retrying on same stream")
|
|
continue
|
|
}
|
|
// ValidateHeartbeat: per-tick behavioral check (rate limiting, bans).
|
|
if ix.ValidateHeartbeat != nil {
|
|
if err := ix.ValidateHeartbeat(*pid); err != nil {
|
|
logger.Warn().Err(err).Str("peer", pid.String()).Msg("heartbeat rejected, closing stream")
|
|
s.Reset()
|
|
return
|
|
}
|
|
}
|
|
ix.StreamMU.Lock()
|
|
// if record already seen update last seen
|
|
if rec, ok := streams[*pid]; ok {
|
|
rec.DID = hb.DID
|
|
// Preserve the existing UptimeTracker so TotalOnline accumulates correctly.
|
|
// hb.Stream is a fresh Stream with no UptimeTracker; carry the old one over.
|
|
oldTracker := rec.GetUptimeTracker()
|
|
rec.HeartbeatStream = hb.Stream
|
|
if oldTracker != nil {
|
|
rec.HeartbeatStream.UptimeTracker = oldTracker
|
|
} else {
|
|
rec.HeartbeatStream.UptimeTracker = &UptimeTracker{FirstSeen: time.Now().UTC()}
|
|
}
|
|
rec.HeartbeatStream.UptimeTracker.RecordHeartbeat()
|
|
rec.LastScore = hb.Score
|
|
logger.Info().Msg("A new node is updated : " + pid.String())
|
|
} else {
|
|
tracker := &UptimeTracker{FirstSeen: time.Now().UTC()}
|
|
tracker.RecordHeartbeat()
|
|
hb.Stream.UptimeTracker = tracker
|
|
streams[*pid] = &StreamRecord[T]{
|
|
DID: hb.DID,
|
|
HeartbeatStream: hb.Stream,
|
|
LastScore: hb.Score,
|
|
}
|
|
logger.Info().Msg("A new node is subscribed : " + pid.String())
|
|
}
|
|
ix.StreamMU.Unlock()
|
|
// Enrich hb.DID before calling the hook: nodes never set hb.DID directly;
|
|
// extract it from the embedded signed PeerRecord if available, then fall
|
|
// back to the DID stored by handleNodePublish in the stream record.
|
|
if hb.DID == "" && len(hb.Record) > 0 {
|
|
var partial struct {
|
|
DID string `json:"did"`
|
|
}
|
|
if json.Unmarshal(hb.Record, &partial) == nil && partial.DID != "" {
|
|
hb.DID = partial.DID
|
|
}
|
|
}
|
|
if hb.DID == "" {
|
|
ix.StreamMU.RLock()
|
|
if rec, ok := streams[*pid]; ok {
|
|
hb.DID = rec.DID
|
|
}
|
|
ix.StreamMU.RUnlock()
|
|
}
|
|
if ix.AfterHeartbeat != nil && hb.DID != "" {
|
|
go ix.AfterHeartbeat(hb)
|
|
}
|
|
// Send response back to the node (bidirectional heartbeat).
|
|
if ix.BuildHeartbeatResponse != nil {
|
|
if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent, hb.Record); resp != nil {
|
|
s.SetWriteDeadline(time.Now().Add(3 * time.Second))
|
|
json.NewEncoder(s).Encode(resp)
|
|
s.SetWriteDeadline(time.Time{})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams map[pp.ID]HeartBeatStreamed, lock *sync.RWMutex, maxNodes int) (*pp.ID, *Heartbeat, error) {
|
|
if len(h.Network().Peers()) >= maxNodes {
|
|
return nil, nil, fmt.Errorf("too many connections, try another indexer")
|
|
}
|
|
var hb Heartbeat
|
|
if err := dec.Decode(&hb); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
_, bpms, latencyScore, _ := getBandwidthChallengeRate(h, s.Conn().RemotePeer(), MinPayloadChallenge+int(rand.Float64()*(MaxPayloadChallenge-MinPayloadChallenge)))
|
|
{
|
|
pid, err := pp.Decode(hb.PeerID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
uptimeRatio := float64(0)
|
|
age := time.Duration(0)
|
|
lock.Lock()
|
|
if rec, ok := streams[pid]; ok && rec.GetUptimeTracker() != nil {
|
|
uptimeRatio = rec.GetUptimeTracker().UptimeRatio()
|
|
age = rec.GetUptimeTracker().Uptime()
|
|
}
|
|
lock.Unlock()
|
|
// E: measure the indexer's own subnet diversity, not the node's view.
|
|
diversity := getOwnDiversityRate(h)
|
|
// fillRate: fraction of indexer capacity used — higher = more peers trust this indexer.
|
|
fillRate := 0.0
|
|
if maxNodes > 0 {
|
|
fillRate = float64(len(h.Network().Peers())) / float64(maxNodes)
|
|
if fillRate > 1 {
|
|
fillRate = 1
|
|
}
|
|
}
|
|
hb.ComputeIndexerScore(uptimeRatio, bpms, diversity, latencyScore, fillRate)
|
|
// B: dynamic minScore — starts at 20% for brand-new peers, ramps to 80% at 24h.
|
|
minScore := dynamicMinScore(age)
|
|
if hb.Score < minScore {
|
|
return nil, nil, errors.New("not enough trusting value")
|
|
}
|
|
hb.Stream = &Stream{
|
|
Name: hb.Name,
|
|
DID: hb.DID,
|
|
Stream: s,
|
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
} // here is the long-lived bidirectional heartbeat.
|
|
return &pid, &hb, err
|
|
}
|
|
}
|