diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index 4468e56..9ce25fb 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -19,7 +19,8 @@ type Event struct { Type string `json:"type"` From string `json:"from"` // peerID - User string + User string + Groups []string DataType int64 `json:"datatype"` Timestamp int64 `json:"ts"` diff --git a/daemons/node/nats.go b/daemons/node/nats.go index 138ac31..969293e 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -117,7 +117,7 @@ func ListenNATS(n *Node) { if err := json.Unmarshal(resp.Payload, &m); err == nil { peers, _ := n.GetPeerRecord(context.Background(), m.PeerID) for _, p := range peers { - n.StreamService.PublishCommon(&resp.Datatype, resp.User, + n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, p.PeerID, proto, resp.Payload) } } @@ -126,7 +126,7 @@ func ListenNATS(n *Node) { fmt.Println(n.StreamService.ToPartnerPublishEvent( context.Background(), propalgation.Action, - dt, resp.User, + dt, resp.User, resp.Groups, propalgation.Payload, )) case tools.PB_CONSIDERS: @@ -137,7 +137,7 @@ func ListenNATS(n *Node) { for _, p := range m.PeerIDs { peers, _ := n.GetPeerRecord(context.Background(), p) for _, pp := range peers { - n.StreamService.PublishCommon(&resp.Datatype, resp.User, + n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups, pp.PeerID, stream.ProtocolConsidersResource, resp.Payload) } } @@ -150,7 +150,7 @@ func ListenNATS(n *Node) { if err := json.Unmarshal(propalgation.Payload, &m); err == nil && m.OriginID != "" { peers, _ := n.GetPeerRecord(context.Background(), m.OriginID) for _, p := range peers { - n.StreamService.PublishCommon(nil, resp.User, + n.StreamService.PublishCommon(nil, resp.User, resp.Groups, p.PeerID, stream.ProtocolConsidersResource, propalgation.Payload) } } @@ -166,11 +166,11 @@ func ListenNATS(n *Node) { n.StreamService.Mu.Lock() if n.StreamService.Streams[stream.ProtocolSendPlanner] != nil { for pid := range n.StreamService.Streams[stream.ProtocolSendPlanner] { // send Planner can be long lived - it's a conn - n.StreamService.PublishCommon(nil, resp.User, pid.String(), stream.ProtocolSendPlanner, b) + n.StreamService.PublishCommon(nil, resp.User, resp.Groups, pid.String(), stream.ProtocolSendPlanner, b) } } } else { - n.StreamService.PublishCommon(nil, resp.User, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b) + n.StreamService.PublishCommon(nil, resp.User, resp.Groups, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b) } n.StreamService.Mu.Unlock() } @@ -211,7 +211,7 @@ func ListenNATS(n *Node) { context.Background(), dt, fmt.Sprintf("%v", m["type"]), - resp.User, + resp.User, resp.Groups, fmt.Sprintf("%v", m["search"]), ) } diff --git a/daemons/node/pubsub/publish.go b/daemons/node/pubsub/publish.go index 296ce18..817dd28 100644 --- a/daemons/node/pubsub/publish.go +++ b/daemons/node/pubsub/publish.go @@ -14,16 +14,16 @@ import ( ) func (ps *PubSubService) SearchPublishEvent( - ctx context.Context, dt *tools.DataType, typ string, user string, search string) error { + ctx context.Context, dt *tools.DataType, typ string, user string, groups []string, search string) error { b, err := json.Marshal(map[string]string{"search": search}) if err != nil { return err } switch typ { case "known": // define Search Strategy - return ps.StreamService.PublishesCommon(dt, user, nil, b, stream.ProtocolSearchResource) //if partners focus only them*/ + return ps.StreamService.PublishesCommon(dt, user, groups, nil, b, stream.ProtocolSearchResource) //if partners focus only them*/ case "partner": // define Search Strategy - return ps.StreamService.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided + return ps.StreamService.PublishesCommon(dt, user, groups, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided And: map[string][]dbs.Filter{ "relation": {{Operator: dbs.EQUAL.String(), Value: peer.PARTNER}}, }, diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index e2503c3..8fcd438 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -80,7 +80,7 @@ func (abs *StreamService) verifyResponse(event *common.Event) error { // } } if b, err := json.Marshal(verify); err == nil { - abs.PublishCommon(nil, "", event.From, ProtocolVerifyResource, b) + abs.PublishCommon(nil, event.User, event.Groups, event.From, ProtocolVerifyResource, b) } return nil } @@ -89,7 +89,7 @@ func (abs *StreamService) sendPlanner(event *common.Event) error { // if len(event.Payload) == 0 { if plan, err := planner.GenerateShallow(&tools.APIRequest{Admin: true}); err == nil { if b, err := json.Marshal(plan); err == nil { - abs.PublishCommon(nil, event.User, event.From, ProtocolSendPlanner, b) + abs.PublishCommon(nil, event.User, event.Groups, event.From, ProtocolSendPlanner, b) } else { return err } @@ -214,11 +214,11 @@ func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event, search for _, dt := range dts { access := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil) peerID := p.GetID() - searched := access.Search(abs.FilterPeer(self.GetID(), search), "", false) + searched := access.Search(abs.FilterPeer(self.GetID(), event.Groups, search), "", false) fmt.Println("SEND SEARCH_EVENT", self.GetID(), dt, len(searched.Data), peerID) for _, ss := range searched.Data { if j, err := json.Marshal(ss); err == nil { - _, err := abs.PublishCommon(&dt, event.User, p.PeerID, ProtocolSearchResource, j) + _, err := abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j) fmt.Println("Publish ERR", err) } } diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index 3dd19cd..a5115f9 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -15,7 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ) -func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter *dbs.Filters, resource []byte, protos ...protocol.ID) error { +func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, groups []string, filter *dbs.Filters, resource []byte, protos ...protocol.ID) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) var p oclib.LibDataShallow if filter == nil { @@ -25,7 +25,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter } for _, pes := range p.Data { for _, proto := range protos { - if _, err := ps.PublishCommon(dt, user, pes.(*peer.Peer).PeerID, proto, resource); err != nil { + if _, err := ps.PublishCommon(dt, user, groups, pes.(*peer.Peer).PeerID, proto, resource); err != nil { return err } } @@ -33,7 +33,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter return nil } -func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID string, proto protocol.ID, resource []byte) (*common.Stream, error) { +func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, groups []string, toPeerID string, proto protocol.ID, resource []byte) (*common.Stream, error) { fmt.Println("PublishCommon") if toPeerID == ps.Key.String() { fmt.Println("Can't send to ourself !") @@ -64,7 +64,7 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID } func (ps *StreamService) ToPartnerPublishEvent( - ctx context.Context, action tools.PubSubAction, dt *tools.DataType, user string, payload []byte) error { + ctx context.Context, action tools.PubSubAction, dt *tools.DataType, user string, groups []string, payload []byte) error { if *dt == tools.PEER { var p peer.Peer if err := json.Unmarshal(payload, &p); err != nil { @@ -82,7 +82,7 @@ func (ps *StreamService) ToPartnerPublishEvent( pe.Relation = p.Relation pe.Verify = false if b2, err := json.Marshal(pe); err == nil { - if _, err := ps.PublishCommon(dt, user, p.PeerID, ProtocolUpdateResource, b2); err != nil { + if _, err := ps.PublishCommon(dt, user, groups, p.PeerID, ProtocolUpdateResource, b2); err != nil { return err } @@ -102,7 +102,7 @@ func (ps *StreamService) ToPartnerPublishEvent( case tools.PB_UPDATE: proto = ProtocolUpdateResource } - ps.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided + ps.PublishesCommon(dt, user, groups, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided And: map[string][]dbs.Filter{ "relation": {{Operator: dbs.EQUAL.String(), Value: peer.PARTNER}}, }, diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 9598329..7ce16e6 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -54,7 +54,7 @@ type StreamService struct { Key pp.ID Host host.Host Node common.DiscoveryPeer - Streams common.ProtocolStream + Streams common.ProtocolStream maxNodesConn int Mu sync.RWMutex } @@ -65,7 +65,7 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c Key: key, Node: node, Host: h, - Streams: common.ProtocolStream{}, + Streams: common.ProtocolStream{}, maxNodesConn: maxNode, } for proto := range protocols { @@ -101,7 +101,6 @@ func (s *StreamService) HandleResponse(stream network.Stream) { stream.Protocol(), protocols[stream.Protocol()]) } - func (s *StreamService) connectToPartners() error { logger := oclib.GetLogger() // Register handlers for partner resource protocols (create/update/delete). @@ -123,7 +122,6 @@ func (s *StreamService) connectToPartners() error { return nil } - func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) { ps := []*peer.Peer{} if conf.GetConfig().PeerIDS != "" { @@ -219,11 +217,12 @@ func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID, } } -func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters { +func (abs *StreamService) FilterPeer(peerID string, groups []string, search string) *dbs.Filters { p, err := oclib.GetMySelf() if err != nil { return nil } + groups = append(groups, "*") filter := map[string][]dbs.Filter{ "abstractinstanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: p.GetID()}}, // is my resource... "": {{Operator: dbs.OR.String(), Value: &dbs.Filters{ @@ -233,7 +232,7 @@ func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters And: map[string][]dbs.Filter{ "resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ And: map[string][]dbs.Filter{ - "resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}}, + "resourcepartnership.peer_groups." + peerID: {{Operator: dbs.IN.String(), Value: groups}}, }, }}}, },