package stream import ( "context" "encoding/json" "errors" "fmt" "oc-discovery/daemons/node/common" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPeerID string, resource []byte) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) p := access.LoadOne(toPeerID) if p.Err != "" { return errors.New(p.Err) } else { ad, err := pp.AddrInfoFromString(p.Data.(*peer.Peer).StreamAddress) if err != nil { return err } ps.write(tools.PB_SEARCH, toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER) } return nil } func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string, search string) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) peers := access.Search(&dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided And: map[string][]dbs.Filter{ "": {{Operator: dbs.NOT.String(), Value: dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided And: map[string][]dbs.Filter{ "relation": {{Operator: dbs.EQUAL.String(), Value: peer.BLACKLIST}}, }, }}}, }, }, search, false) if peers.Err != "" { return errors.New(peers.Err) } else { b, err := json.Marshal(map[string]string{"search": search}) if err != nil { return err } for _, p := range peers.Data { ad, err := pp.AddrInfoFromString(p.(*peer.Peer).StreamAddress) if err != nil { continue } ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER) } } return nil } func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user string, search string) error { if peers, err := ps.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())); err != nil { return err } else { b, err := json.Marshal(map[string]string{"search": search}) if err != nil { return err } for _, p := range peers { ad, err := pp.AddrInfoFromString(p.StreamAddress) if err != nil { continue } ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, true) } } return nil } func (ps *StreamService) ToPartnerPublishEvent( ctx context.Context, action tools.PubSubAction, dt *tools.DataType, user string, payload []byte) error { if *dt == tools.PEER { var p peer.Peer if err := json.Unmarshal(payload, &p); err != nil { return err } ad, err := pp.AddrInfoFromString(p.StreamAddress) if err != nil { return err } ps.mu.Lock() defer ps.mu.Unlock() if p.Relation == peer.PARTNER { if ps.Streams[ProtocolHeartbeatPartner] == nil { ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} } ps.ConnectToPartner(ad.ID, ad) } else if ps.Streams[ProtocolHeartbeatPartner] != nil && ps.Streams[ProtocolHeartbeatPartner][ad.ID] != nil { for _, pids := range ps.Streams { if pids[ad.ID] != nil { delete(pids, ad.ID) } } } return nil } if peers, err := ps.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())); err != nil { return err } else { for _, p := range peers { for _, protocol := range protocols { ad, err := pp.AddrInfoFromString(p.StreamAddress) if err != nil { continue } ps.write(action, p.GetID(), ad, dt, user, payload, protocol, true) } } } return nil } func (s *StreamService) write( action tools.PubSubAction, did string, peerID *pp.AddrInfo, dt *tools.DataType, user string, payload []byte, proto protocol.ID, isAPartner bool) error { logger := oclib.GetLogger() name := action.String() + "#" + peerID.ID.String() if dt != nil { name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String() } s.mu.Lock() defer s.mu.Unlock() if s.Streams[proto] == nil { s.Streams[proto] = map[pp.ID]*common.Stream{} } if s.Streams[proto][peerID.ID] == nil { // should create a very temp stream ctxTTL, err := context.WithTimeout(context.Background(), 10*time.Second) if err == nil { if isAPartner { ctxTTL = context.Background() } if s.Host.Network().Connectedness(peerID.ID) != network.Connected { _ = s.Host.Connect(ctxTTL, *peerID) str, err := s.Host.NewStream(ctxTTL, peerID.ID, ProtocolHeartbeatPartner) if err == nil { s.Streams[ProtocolHeartbeatPartner][peerID.ID] = &common.Stream{ DID: did, Stream: str, Expiry: time.Now().UTC().Add(5 * time.Second), } str2, err := s.Host.NewStream(ctxTTL, peerID.ID, proto) if err == nil { s.Streams[proto][peerID.ID] = &common.Stream{ DID: did, Stream: str2, Expiry: time.Now().UTC().Add(5 * time.Second), } } } } } return errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String()) } stream := s.Streams[proto][peerID.ID] enc := json.NewEncoder(stream.Stream) evt := common.NewEvent(name, peerID.ID.String(), dt, user, payload) if err := enc.Encode(evt); err != nil { stream.Stream.Close() logger.Err(err) return nil } return nil }