package stream import ( "context" "encoding/json" "errors" "fmt" "sync" "time" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" ) // ProtocolObserve is the libp2p protocol for peer connectivity observation. // The requesting oc-discovery opens a stream to the remote oc-discovery and // sends an ObserveRequest. The remote side keeps the stream open and writes // ObserveHeartbeat events back every observeHBInterval seconds. const ProtocolObserve = "/opencloud/peer/observe/1.0" // observeHBEventType is used as the common.Event.Type for heartbeat responses. const observeHBEventType = "/opencloud/peer/observe/heartbeat" const observeHBInterval = 30 * time.Second const observeDrainDuration = 30 * time.Second // observeBatchWindow is the accumulation window before a heartbeat batch is // flushed to NATS. All peer heartbeats received within this window are grouped // into a single PEER_OBSERVE_RESPONSE_EVENT, reducing NATS traffic. const observeBatchWindow = 2 * time.Second // ObserveRequest is the first (and only) message sent by the observing side // when opening a ProtocolObserve stream. type ObserveRequest struct { // Close, when true, asks the remote side to stop the heartbeat goroutine // and remove the observer from its cache. Used for graceful teardown. Close bool `json:"close,omitempty"` } // ObserveHeartbeat is sent by the observed side every observeHBInterval. type ObserveHeartbeat struct { State string `json:"state"` // always "online" when actively emitted } // ShallowPeer is the minimal peer representation sent by oc-peer in a // PEER_OBSERVE_EVENT. StreamAddress lets oc-discovery connect without a DB // lookup; Address carries the NATSAddress (unused here, forwarded as-is). type ShallowPeer struct { ID string `json:"id"` PeerID string `json:"peer_id"` Address string `json:"address"` StreamAddress string `json:"stream_address"` } // ObserveCommand is the payload carried by a PEER_OBSERVE_EVENT NATS message // (from oc-peer). // // Observe → User + Peers populated // Close → User + PeerIDs + Close=true // CloseAll → CloseAll=true (User optional) type ObserveCommand struct { User string `json:"user"` Peers []ShallowPeer `json:"peers,omitempty"` PeerIDs []string `json:"peer_ids,omitempty"` Close bool `json:"close,omitempty"` CloseAll bool `json:"close_all,omitempty"` } // ── observe cache (observed side) ──────────────────────────────────────────── // observeCache tracks running heartbeat goroutines keyed by the observing // peer's libp2p PeerID string. It is used exclusively on the OBSERVED side. type observeCache struct { mu sync.Mutex cancels map[string]context.CancelFunc } func newObserveCache() *observeCache { return &observeCache{cancels: map[string]context.CancelFunc{}} } func (c *observeCache) set(pid string, cancel context.CancelFunc) { c.mu.Lock() defer c.mu.Unlock() if old, ok := c.cancels[pid]; ok { old() // cancel previous goroutine if any } c.cancels[pid] = cancel } func (c *observeCache) cancel(pid string) { c.mu.Lock() defer c.mu.Unlock() if fn, ok := c.cancels[pid]; ok { fn() delete(c.cancels, pid) } } func (c *observeCache) cancelAll() { c.mu.Lock() defer c.mu.Unlock() for _, fn := range c.cancels { fn() } c.cancels = map[string]context.CancelFunc{} } func (c *observeCache) delete(pid string) { c.mu.Lock() defer c.mu.Unlock() delete(c.cancels, pid) } // ── heartbeat batcher (observing side) ─────────────────────────────────────── // heartbeatBatcher accumulates peer_ids from incoming heartbeats over // observeBatchWindow, then flushes them in a single NATS call. // Using a map as the backing store deduplicates multiple heartbeats from the // same peer within the same window (should not happen, but is harmless). type heartbeatBatcher struct { mu sync.Mutex ids map[string]struct{} timer *time.Timer flush func(peerIDs []string) } func newHeartbeatBatcher(flush func([]string)) *heartbeatBatcher { return &heartbeatBatcher{ ids: make(map[string]struct{}), flush: flush, } } // add records peerID in the current batch and arms the flush timer if needed. func (b *heartbeatBatcher) add(peerID string) { b.mu.Lock() defer b.mu.Unlock() b.ids[peerID] = struct{}{} if b.timer == nil { b.timer = time.AfterFunc(observeBatchWindow, b.fire) } } // fire is called by the timer; it drains the batch and invokes flush. func (b *heartbeatBatcher) fire() { b.mu.Lock() ids := make([]string, 0, len(b.ids)) for id := range b.ids { ids = append(ids, id) } b.ids = make(map[string]struct{}) b.timer = nil b.mu.Unlock() if len(ids) > 0 { b.flush(ids) } } // flushObserveBatch is the flush function wired into the heartbeatBatcher. // It emits two NATS messages: // - PEER_OBSERVE_RESPONSE_EVENT → consumed by oc-peer (direct channel) // - PROPALGATION_EVENT / PB_PROPAGATE → consumed by other oc-discovery nodes func flushObserveBatch(peerIDs []string) { payload, err := json.Marshal(map[string]interface{}{ "peer_ids": peerIDs, "state": "online", }) if err != nil { return } // Direct notification to oc-peer. tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_RESPONSE_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.PEER_OBSERVE_RESPONSE_EVENT), Payload: payload, }) // Broadcast to other oc-discovery nodes so they can forward to their // local oc-peer if needed. propPayload, err := json.Marshal(tools.PropalgationMessage{ DataType: int(tools.PEER), Action: tools.PB_PROPAGATE, Payload: payload, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.PROPALGATION_EVENT), Payload: propPayload, }) } // ── incoming observe handler (observed side) ────────────────────────────────── // handleIncomingObserve is registered as the ProtocolObserve stream handler. // It is called when a remote peer opens an observe stream to us. // The function reads the request, validates it, then starts (or stops) the // heartbeat goroutine and returns immediately — the goroutine owns the stream. func (s *StreamService) handleIncomingObserve(rawStream network.Stream) error { remotePeerID := rawStream.Conn().RemotePeer().String() addr := rawStream.Conn().RemoteMultiaddr().String() ad, err := pp.AddrInfoFromString(addr + "/p2p/" + remotePeerID) if err != nil { fmt.Println("qndlqnl EERR", addr, err) return err } log := oclib.GetLogger() // Drain mode: reject any new observations for 30 s after a close-all. s.drainMu.RLock() draining := !s.drainUntil.IsZero() && time.Now().Before(s.drainUntil) s.drainMu.RUnlock() if draining { rawStream.Close() fmt.Println("Draining") return errors.New("Draining") } // Read the observe request (with a generous deadline to avoid hangs). // Guard: the requesting peer must not be blacklisted or be ourself. did := "" access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) res := access.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: remotePeerID}}, }, }, "", false, 0, 1) if len(res.Data) > 0 { p := res.Data[0].(*peer.Peer) did = p.GetID() if p.Relation == peer.BLACKLIST { // || p.Relation == peer.SELF rawStream.Close() fmt.Println("CLOSE blacklist or self") return errors.New("can't exploit blacklist or self") } } // Replace any existing heartbeat goroutine for this observer. ctx, cancel := context.WithCancel(context.Background()) s.observeCache.set(remotePeerID, cancel) fmt.Println("LOOP OBSERVE") go func() { defer rawStream.Close() defer cancel() defer s.observeCache.delete(remotePeerID) ticker := time.NewTicker(observeHBInterval) defer ticker.Stop() hbPayload, _ := json.Marshal(ObserveHeartbeat{State: "online"}) evt := common.NewEvent(observeHBEventType, s.Host.ID().String(), nil, "", hbPayload) if evt == nil { return } if s.Streams, err = common.TempStream(s.Host, *ad, ProtocolObserve, did, s.Streams, protocols, &s.Mu); err == nil { stream := s.Streams[ProtocolObserve][ad.ID] if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil { // Moderate connectivity event: the observer is unreachable. // The deferred calls above purge this observer from the cache. fmt.Println("LOOP EVT ERR", err) log.Info(). Str("observer", remotePeerID). Err(err). Msg("[observe] heartbeat write failed — moderate connectivity event, purging observer from cache") return } } for { select { case <-ctx.Done(): return case <-ticker.C: rawStream.SetWriteDeadline(time.Now().Add(5 * time.Second)) fmt.Println("LOOP EVT", evt) var err error if s.Streams, err = common.TempStream(s.Host, *ad, ProtocolObserve, did, s.Streams, protocols, &s.Mu); err == nil { stream := s.Streams[ProtocolObserve][ad.ID] if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil { // Moderate connectivity event: the observer is unreachable. // The deferred calls above purge this observer from the cache. fmt.Println("LOOP EVT ERR", err) log.Info(). Str("observer", remotePeerID). Err(err). Msg("[observe] heartbeat write failed — moderate connectivity event, purging observer from cache") return } } rawStream.SetWriteDeadline(time.Time{}) } } }() return nil } // ── heartbeat receiver (observing side) ─────────────────────────────────────── // handleObserveHeartbeat is called by readLoop when a heartbeat event arrives // on an outgoing ProtocolObserve stream. It queues the peer_id in the batch // accumulator; the batcher flushes to NATS after observeBatchWindow. func (ps *StreamService) handleObserveHeartbeat(evt *common.Event) error { // ps.hbBatcher.add(evt.From) flushObserveBatch([]string{evt.From}) return nil } // ── user→peer index (ref-counted observe management) ───────────────────────── // userPeerIndex tracks which users are observing which peers. // A libp2p observe stream is kept open as long as at least one user watches // the peer; it is closed only when the last user stops. type userPeerIndex struct { mu sync.Mutex index map[string]map[string]struct{} // user → set of peer_id strings } func newUserPeerIndex() *userPeerIndex { return &userPeerIndex{index: map[string]map[string]struct{}{}} } // add registers user as an observer of peerID. // Returns true if peerID was not yet observed by any user (first observer). func (u *userPeerIndex) add(user, peerID string) (isFirst bool) { u.mu.Lock() defer u.mu.Unlock() // Count total observers for peerID across all users before adding. total := 0 for _, peers := range u.index { if _, ok := peers[peerID]; ok { total++ } } if u.index[user] == nil { u.index[user] = map[string]struct{}{} } u.index[user][peerID] = struct{}{} return total == 0 } // remove unregisters user from peerID. // Returns true if no user is observing peerID anymore (last observer removed). func (u *userPeerIndex) remove(user, peerID string) (isLast bool) { u.mu.Lock() defer u.mu.Unlock() delete(u.index[user], peerID) if len(u.index[user]) == 0 { delete(u.index, user) } for _, peers := range u.index { if _, ok := peers[peerID]; ok { return false } } return true } // removeUser removes all entries for user and returns the peer_ids that now // have no remaining observers (i.e., those whose streams should be closed). func (u *userPeerIndex) removeUser(user string) []string { u.mu.Lock() defer u.mu.Unlock() watched := u.index[user] delete(u.index, user) var orphans []string for peerID := range watched { found := false for _, peers := range u.index { if _, ok := peers[peerID]; ok { found = true break } } if !found { orphans = append(orphans, peerID) } } return orphans } // ── NATS command handler (observing side) ───────────────────────────────────── // HandleObserveNATSCommand processes a PEER_OBSERVE_EVENT received from oc-peer. func (ps *StreamService) HandleObserveNATSCommand(resp tools.NATSResponse) { log := oclib.GetLogger() var cmd ObserveCommand if err := json.Unmarshal(resp.Payload, &cmd); err != nil { log.Warn().Err(err).Msg("[observe] failed to unmarshal ObserveCommand") return } if cmd.CloseAll { log.Info().Msg("[observe] close-all received via NATS") ps.CloseAllObserves() return } if cmd.Close { for _, peerID := range cmd.PeerIDs { if isLast := ps.observeUsers.remove(cmd.User, peerID); isLast { if err := ps.closeObserveStream(peerID); err != nil { log.Warn().Str("peer", peerID).Err(err).Msg("[observe] closeObserveStream failed") } } } return } // Observe: open streams for any new peer, using the address from the payload. for _, p := range cmd.Peers { if isFirst := ps.observeUsers.add(cmd.User, p.PeerID); isFirst { if err := ps.openObserveStream(p); err != nil { // Roll back the index entry so the next NATS command can retry. ps.observeUsers.remove(cmd.User, p.PeerID) log.Warn().Str("peer", p.PeerID).Err(err).Msg("[observe] openObserveStream failed") } } } } // ── outgoing observe management (observing side) ────────────────────────────── // OpenObserveStream is the exported variant for inter-discovery propagation // (no user context available). It bypasses the user index and opens the stream // directly if not already open. func (ps *StreamService) OpenObserveStream(p ShallowPeer) error { return ps.openObserveStream(p) } // CloseObserveStream is the exported variant for inter-discovery propagation. func (ps *StreamService) CloseObserveStream(toPeerID string) error { return ps.closeObserveStream(toPeerID) } // openObserveStream opens a ProtocolObserve stream to p. // Uses p.StreamAddress directly; falls back to DB then DHT lookup if empty. func (ps *StreamService) openObserveStream(p ShallowPeer) error { streamAddr := p.StreamAddress fmt.Println("STREAM OBS", streamAddr) access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) res := access.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}}, }, }, "", false, 0, 1) if streamAddr == "" { // Fallback: DB then DHT. if len(res.Data) > 0 { streamAddr = res.Data[0].(*peer.Peer).StreamAddress } else if peers, err := ps.Node.GetPeerRecord(context.Background(), p.PeerID); err == nil && len(peers) > 0 { streamAddr = peers[0].StreamAddress } } if len(res.Data) > 0 && res.Data[0].(*peer.Peer).Relation == peer.SELF { return errors.New("Can't send to self") } fmt.Println("STREAM OBS SSS", streamAddr) if streamAddr == "" { return nil // can't resolve address — silently skip } decodedID, err := pp.Decode(p.PeerID) if err != nil { return err } // If a stream already exists, reuse it. ps.Mu.RLock() _, alreadyOpen := ps.Streams[ProtocolObserve][decodedID] ps.Mu.RUnlock() if alreadyOpen { return nil } ad, err := pp.AddrInfoFromString(streamAddr) if err != nil { return err } fmt.Println("TempStream OBSERVE", ad) if ps.Streams, err = common.TempStream(ps.Host, *ad, ProtocolObserve, p.ID, ps.Streams, protocols, &ps.Mu); err == nil { rawStream := ps.Streams[ProtocolObserve][ad.ID] if hbPayload, err := json.Marshal(ObserveRequest{Close: false}); err == nil { if err := json.NewEncoder(rawStream.Stream).Encode(common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", hbPayload)); err != nil { fmt.Println("ERR") rawStream.Stream.Close() return err } s := &common.Stream{ Stream: rawStream.Stream, Expiry: time.Now().Add(365 * 24 * time.Hour), } ps.Mu.Lock() if ps.Streams[ProtocolObserve] == nil { ps.Streams[ProtocolObserve] = map[pp.ID]*common.Stream{} } ps.Streams[ProtocolObserve][ad.ID] = s ps.Mu.Unlock() go ps.readLoop(s, ad.ID, ProtocolObserve, &common.ProtocolInfo{PersistantStream: true}) } } else { return err } return nil } // closeObserveStream closes the ProtocolObserve stream to toPeerID and notifies // the remote side. func (ps *StreamService) closeObserveStream(toPeerID string) error { decodedID, err := pp.Decode(toPeerID) if err != nil { return err } ps.Mu.Lock() if ps.Streams[ProtocolObserve] != nil { if s, ok := ps.Streams[ProtocolObserve][decodedID]; ok { _ = json.NewEncoder(s.Stream).Encode(ObserveRequest{Close: true}) s.Stream.Close() delete(ps.Streams[ProtocolObserve], decodedID) } } ps.Mu.Unlock() return nil } // CloseAllObserves closes every outgoing ProtocolObserve stream, clears the // user index, and enters drain mode for observeDrainDuration. func (ps *StreamService) CloseAllObserves() { ps.Mu.Lock() for _, s := range ps.Streams[ProtocolObserve] { _ = json.NewEncoder(s.Stream).Encode(ObserveRequest{Close: true}) s.Stream.Close() } delete(ps.Streams, ProtocolObserve) ps.Mu.Unlock() // Reset user index so stale ref-counts don't block future opens. ps.observeUsers = newUserPeerIndex() ps.drainMu.Lock() ps.drainUntil = time.Now().Add(observeDrainDuration) ps.drainMu.Unlock() }