package common import ( "context" "encoding/json" "errors" "fmt" "oc-discovery/conf" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" peer "cloud.o-forge.io/core/oc-lib/models/peer" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) type LongLivedStreamRecordedService[T interface{}] struct { *LongLivedPubSubService StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] StreamMU sync.RWMutex maxNodesConn int isBidirectionnal bool } func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int, isBidirectionnal bool) *LongLivedStreamRecordedService[T] { service := &LongLivedStreamRecordedService[T]{ LongLivedPubSubService: NewLongLivedPubSubService(h), StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{}, maxNodesConn: maxNodesConn, isBidirectionnal: isBidirectionnal, } go service.StartGC(30 * time.Second) // Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned go service.Snapshot(1 * time.Hour) return service } func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) { go func() { t := time.NewTicker(interval) defer t.Stop() for range t.C { ix.gc() } }() } func (ix *LongLivedStreamRecordedService[T]) gc() { ix.StreamMU.Lock() defer ix.StreamMU.Unlock() now := time.Now() streams := ix.StreamRecords[ProtocolHeartbeat] if streams == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} return } for pid, rec := range streams { if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) { for _, sstreams := range ix.StreamRecords { if sstreams[pid] != nil { sstreams[pid].Stream.Close() delete(sstreams, pid) } } ix.PubsubMu.Lock() if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil { if b, err := json.Marshal(TopicNodeActivityPub{ DID: rec.HeartbeatStream.DID, PeerID: pid.String(), NodeActivity: peer.OFFLINE, }); err == nil { ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b) } } ix.PubsubMu.Unlock() } } } func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) { go func() { logger := oclib.GetLogger() t := time.NewTicker(interval) defer t.Stop() for range t.C { infos := ix.snapshot() for _, inf := range infos { logger.Info().Msg(" -> " + inf.DID) } } }() } // -------- Snapshot / Query -------- func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] { ix.StreamMU.Lock() defer ix.StreamMU.Unlock() out := make([]*StreamRecord[T], 0, len(ix.StreamRecords)) for _, streams := range ix.StreamRecords { for _, stream := range streams { out = append(out, stream) } } return out } func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) { streams := ix.StreamRecords[ProtocolHeartbeat] pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn) if err != nil { return } ix.StreamMU.Lock() defer ix.StreamMU.Unlock() if streams == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} return } // if record already seen update last seen if rec, ok := streams[*pid]; ok { rec.DID = hb.DID rec.Stream = s rec.HeartbeatStream = hb.Stream rec.LastSeen = time.Unix(hb.Timestamp, 0) } // si je l'handle et que je ne suis pas dans une } func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heartbeat, error) { if len(h.Network().Peers()) >= maxNodes { return nil, nil, fmt.Errorf("too many connections, try another indexer") } defer s.Close() var hb Heartbeat if err := json.NewDecoder(s).Decode(&hb); err != nil { return nil, nil, err } pid, err := pp.Decode(hb.PeerID) hb.Stream.Stream = s // here is the long-lived bidirectionnal heart bit. return &pid, &hb, err } type StreamRecord[T interface{}] struct { DID string HeartbeatStream *Stream Stream network.Stream Record T LastSeen time.Time // to check expiry } type Stream struct { DID string `json:"did"` Stream network.Stream Expiry time.Time `json:"expiry"` } func NewStream[T interface{}](s network.Stream, did string, record T) *Stream { return &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } type ProtocolStream map[protocol.ID]map[pp.ID]*Stream func (ps ProtocolStream) Get(protocol protocol.ID) map[pp.ID]*Stream { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } return ps[protocol] } func (ps ProtocolStream) Add(protocol protocol.ID, peerID *pp.ID, s *Stream) error { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } if peerID != nil { if s != nil { ps[protocol][*peerID] = s } else { return errors.New("unable to add stream : stream missing") } } return nil } func (ps ProtocolStream) Delete(protocol protocol.ID, peerID *pp.ID) { if streams, ok := ps[protocol]; ok { if peerID != nil && streams[*peerID] != nil { streams[*peerID].Stream.Close() delete(streams, *peerID) } else { for _, s := range ps { for _, v := range s { v.Stream.Close() } } delete(ps, protocol) } } } const ( ProtocolPublish = "/opencloud/record/publish/1.0" ProtocolGet = "/opencloud/record/get/1.0" ) var StaticIndexers []*pp.AddrInfo = []*pp.AddrInfo{} var StreamIndexers ProtocolStream = ProtocolStream{} func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) { logger := oclib.GetLogger() ctx := context.Background() addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",") if len(addresses) > maxIndexer { addresses = addresses[0:maxIndexer] } for _, indexerAddr := range addresses { ad, err := pp.AddrInfoFromString(indexerAddr) if err != nil { logger.Err(err) continue } if h.Network().Connectedness(ad.ID) != network.Connected { if err := h.Connect(ctx, *ad); err != nil { logger.Err(err) continue } } StaticIndexers = append(StaticIndexers, ad) // make a privilege streams with indexer. for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} { AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, nil) } } if len(StaticIndexers) == 0 { panic("can't run a node with no indexers") } if len(StaticIndexers) < minIndexer { // TODO : ask for unknown indexer. } SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20) // your indexer is just like a node for the next indexer. } func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, onStreamCreated *func(network.Stream)) ProtocolStream { if onStreamCreated == nil { f := func(s network.Stream) { protoS[proto][id] = &Stream{ Stream: s, Expiry: time.Now().Add(2 * time.Minute), } } onStreamCreated = &f } f := *onStreamCreated if mypid > id { if ctx == nil { c := context.Background() ctx = &c } if protoS[proto] == nil { protoS[proto] = map[pp.ID]*Stream{} } if protoS[proto][id] != nil { protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) } else { s, err := h.NewStream(*ctx, id, proto) if err != nil { panic(err.Error()) } f(s) } } else { h.SetStreamHandler(proto, f) } return protoS } type Heartbeat struct { Stream *Stream `json:"stream"` DID string `json:"did"` PeerID string `json:"peer_id"` Timestamp int64 `json:"timestamp"` } type HeartbeatInfo []struct { Info []byte `json:"info"` } const ProtocolHeartbeat = "/opencloud/heartbeat/1.0" func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) { peerID, err := oclib.GenerateNodeID() if err == nil { panic("can't heartbeat daemon failed to start") } go func() { t := time.NewTicker(interval) defer t.Stop() for { select { case <-t.C: hb := Heartbeat{ DID: peerID, PeerID: h.ID().String(), Timestamp: time.Now().Unix(), } for _, ix := range peers { _ = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second) } case <-ctx.Done(): return } } }() } func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) error { streams := ps.Get(proto) if len(streams) == 0 { return errors.New("no stream for protocol heartbeat founded") } pss, exists := streams[p.ID] ctxTTL, _ := context.WithTimeout(ctx, 3*interval) // Connect si nécessaire if h.Network().Connectedness(p.ID) != network.Connected { _ = h.Connect(ctxTTL, *p) exists = false // on devra recréer le stream } // Crée le stream si inexistant ou fermé if !exists || pss.Stream == nil { s, err := h.NewStream(ctx, p.ID, proto) if err != nil { return err } pss = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } streams[p.ID] = pss } // Envoie le heartbeat ss := json.NewEncoder(pss.Stream) err := ss.Encode(&hb) if err != nil { pss.Stream.Close() pss.Stream = nil // recréera au prochain tick return err } pss.Expiry = time.Now().UTC().Add(2 * time.Minute) return nil } /* func SearchPeer(search string) ([]*peer.Peer, error) { ps := []*peer.Peer{} access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) peers := access.Search(nil, search, false) if len(peers.Data) == 0 { return ps, errors.New("no self available") } for _, p := range peers.Data { ps = append(ps, p.(*peer.Peer)) } return ps, nil } */