package common import ( "context" "encoding/json" "fmt" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) const ( ProtocolPublish = "/opencloud/record/publish/1.0" ProtocolGet = "/opencloud/record/get/1.0" ) const ProtocolHeartbeat = "/opencloud/heartbeat/1.0" // ProtocolWitnessQuery is opened by a node to ask a peer what it thinks of a given indexer. const ProtocolWitnessQuery = "/opencloud/witness/1.0" // ProtocolSearchPeer is opened by a node toward one of its indexers to start a // distributed peer search. The stream stays open; the indexer writes // SearchPeerResult JSON objects as results arrive from the GossipSub mesh. const ProtocolSearchPeer = "/opencloud/search/peer/1.0" // ProtocolSearchPeerResponse is opened by an indexer back toward the emitting // indexer to deliver search results found in its referencedNodes. const ProtocolSearchPeerResponse = "/opencloud/search/peer/response/1.0" // ProtocolBandwidthProbe is a dedicated short-lived stream used exclusively // for bandwidth/latency measurement. The handler echoes any bytes it receives. // All nodes and indexers register this handler so peers can measure them. const ProtocolBandwidthProbe = "/opencloud/probe/1.0" type Stream struct { Name string `json:"name"` DID string `json:"did"` Stream network.Stream Expiry time.Time `json:"expiry"` UptimeTracker *UptimeTracker } func (s *Stream) GetUptimeTracker() *UptimeTracker { return s.UptimeTracker } func NewStream[T interface{}](s network.Stream, did string, record T) *Stream { return &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } type StreamRecord[T interface{}] struct { DID string HeartbeatStream *Stream Record T LastScore float64 } func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker { if s.HeartbeatStream == nil { return nil } return s.HeartbeatStream.UptimeTracker } type ProtocolInfo struct { PersistantStream bool WaitResponse bool TTL time.Duration } func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, pts map[protocol.ID]*ProtocolInfo, mu *sync.RWMutex) (ProtocolStream, error) { expiry := 2 * time.Second if pts[proto] != nil { expiry = pts[proto].TTL } ctxTTL, cancelTTL := context.WithTimeout(context.Background(), expiry) defer cancelTTL() if h.Network().Connectedness(ad.ID) != network.Connected { if err := h.Connect(ctxTTL, ad); err != nil { return streams, err } } if streams[proto] != nil && streams[proto][ad.ID] != nil { return streams, nil } else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil { mu.Lock() if streams[proto] == nil { streams[proto] = map[pp.ID]*Stream{} } mu.Unlock() time.AfterFunc(expiry, func() { mu.Lock() delete(streams[proto], ad.ID) mu.Unlock() }) mu.Lock() streams[proto][ad.ID] = &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(expiry), } mu.Unlock() return streams, nil } else { return streams, err } } func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) (*HeartbeatResponse, time.Duration, error) { logger := oclib.GetLogger() if ps[proto] == nil { ps[proto] = map[pp.ID]*Stream{} } streams := ps[proto] pss, exists := streams[p.ID] ctxTTL, cancel := context.WithTimeout(ctx, 3*interval) defer cancel() if h.Network().Connectedness(p.ID) != network.Connected { if err := h.Connect(ctxTTL, *p); err != nil { logger.Err(err) return nil, 0, err } exists = false } if !exists || pss.Stream == nil { logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String()) s, err := h.NewStream(ctx, p.ID, proto) if err != nil { fmt.Println(s, err, p.ID) logger.Err(err) return nil, 0, err } pss = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } streams[p.ID] = pss } sentAt := time.Now() if err := json.NewEncoder(pss.Stream).Encode(&hb); err != nil { pss.Stream.Close() pss.Stream = nil return nil, 0, err } pss.Expiry = time.Now().UTC().Add(2 * time.Minute) // Try to read a response (indexers that support bidirectional heartbeat respond). pss.Stream.SetReadDeadline(time.Now().Add(5 * time.Second)) var resp HeartbeatResponse rtt := time.Since(sentAt) if err := json.NewDecoder(pss.Stream).Decode(&resp); err == nil { rtt = time.Since(sentAt) pss.Stream.SetReadDeadline(time.Time{}) return &resp, rtt, nil } pss.Stream.SetReadDeadline(time.Time{}) return nil, rtt, nil } func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream { logger := oclib.GetLogger() if onStreamCreated == nil { f := func(s network.Stream) { protoS[proto][id] = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } onStreamCreated = &f } f := *onStreamCreated if mypid > id || force { if ctx == nil { c := context.Background() ctx = &c } if protoS[proto] == nil { protoS[proto] = map[pp.ID]*Stream{} } if protoS[proto][id] != nil { protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) } else { logger.Info().Msg("NEW STREAM Generated" + fmt.Sprintf("%v", proto) + " " + id.String()) s, err := h.NewStream(*ctx, id, proto) if err != nil { panic(err.Error()) } f(s) } } return protoS }