diff --git a/daemons/node/node.go b/daemons/node/node.go index ef913f6..9abd692 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -73,8 +73,6 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { common.ConnectToIndexers(node.Host, 0, 5, node.PeerID) // TODO : make var to change how many indexers are allowed. logger.Info().Msg("claims my node...") node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname) - logger.Info().Msg("subscribe to node activity...") - node.SubscribeToNodeActivity(node.PS) // now we subscribe to a long run topic named node-activity, to relay message. logger.Info().Msg("subscribe to decentralized search flow...") node.SubscribeToSearch(node.PS) logger.Info().Msg("run garbage collector...") diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index 85548b5..0153a65 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -27,7 +27,7 @@ func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPee if err != nil { return err } - ps.write(tools.PB_SEARCH, toPeerID, ad, dt, user, resource, ProtocolSearchResource) + ps.write(tools.PB_SEARCH, toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER) } return nil } @@ -55,7 +55,7 @@ func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string if err != nil { continue } - ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource) + ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER) } } return nil @@ -74,7 +74,7 @@ func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user str if err != nil { continue } - ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource) + ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, true) } } return nil @@ -82,6 +82,31 @@ func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user str func (ps *StreamService) ToPartnerPublishEvent( ctx context.Context, action tools.PubSubAction, dt *tools.DataType, user string, payload []byte) error { + if *dt == tools.PEER { + var p peer.Peer + if err := json.Unmarshal(payload, &p); err != nil { + return err + } + ad, err := pp.AddrInfoFromString(p.StreamAddress) + if err != nil { + return err + } + ps.mu.Lock() + defer ps.mu.Unlock() + if p.Relation == peer.PARTNER { + if ps.Streams[ProtocolHeartbeatPartner] == nil { + ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} + } + ps.ConnectToPartner(ad.ID, ad) + } else if ps.Streams[ProtocolHeartbeatPartner] != nil && ps.Streams[ProtocolHeartbeatPartner][ad.ID] != nil { + for _, pids := range ps.Streams { + if pids[ad.ID] != nil { + delete(pids, ad.ID) + } + } + } + return nil + } if peers, err := ps.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())); err != nil { return err } else { @@ -91,7 +116,7 @@ func (ps *StreamService) ToPartnerPublishEvent( if err != nil { continue } - ps.write(action, p.GetID(), ad, dt, user, payload, protocol) + ps.write(action, p.GetID(), ad, dt, user, payload, protocol, true) } } } @@ -105,7 +130,8 @@ func (s *StreamService) write( dt *tools.DataType, user string, payload []byte, - proto protocol.ID) error { + proto protocol.ID, + isAPartner bool) error { logger := oclib.GetLogger() name := action.String() + "#" + peerID.ID.String() @@ -122,6 +148,9 @@ func (s *StreamService) write( // should create a very temp stream ctxTTL, err := context.WithTimeout(context.Background(), 10*time.Second) if err == nil { + if isAPartner { + ctxTTL = context.Background() + } if s.Host.Network().Connectedness(peerID.ID) != network.Connected { _ = s.Host.Connect(ctxTTL, *peerID) str, err := s.Host.NewStream(ctxTTL, peerID.ID, ProtocolHeartbeatPartner) diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 99b6fe5..0929af3 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -79,7 +79,7 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) { pid := stream.Conn().RemotePeer() ai, err := pp.AddrInfoFromP2pAddr(stream.Conn().RemoteMultiaddr()) if err == nil { - s.connectToPartner(pid, ai) + s.ConnectToPartner(pid, ai) } } go s.StartGC(30 * time.Second) @@ -99,14 +99,14 @@ func (s *StreamService) connectToPartners() error { if err != nil { continue } - s.connectToPartner(pid, ad) + s.ConnectToPartner(pid, ad) // heartbeat your partner. } // TODO if handle... from partner then HeartBeat back return nil } -func (s *StreamService) connectToPartner(pid pp.ID, ad *pp.AddrInfo) { +func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) { for _, proto := range protocols { f := func(ss network.Stream) { if s.Streams[proto] == nil {