232 lines
6.8 KiB
Go
232 lines
6.8 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"oc-discovery/daemons/node/common"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"cloud.o-forge.io/core/oc-lib/dbs"
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
)
|
|
|
|
const ProtocolSearchResource = "/opencloud/resource/search/1.0"
|
|
const ProtocolCreateResource = "/opencloud/resource/create/1.0"
|
|
const ProtocolUpdateResource = "/opencloud/resource/update/1.0"
|
|
const ProtocolDeleteResource = "/opencloud/resource/delete/1.0"
|
|
|
|
const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0"
|
|
|
|
var protocols = []protocol.ID{
|
|
ProtocolSearchResource,
|
|
ProtocolCreateResource,
|
|
ProtocolUpdateResource,
|
|
ProtocolDeleteResource,
|
|
}
|
|
|
|
type StreamService struct {
|
|
Key pp.ID
|
|
Host host.Host
|
|
Node common.DiscoveryPeer
|
|
Streams common.ProtocolStream
|
|
maxNodesConn int
|
|
mu sync.Mutex
|
|
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
|
|
}
|
|
|
|
func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) {
|
|
logger := oclib.GetLogger()
|
|
service := &StreamService{
|
|
Key: key,
|
|
Node: node,
|
|
Host: h,
|
|
Streams: common.ProtocolStream{},
|
|
maxNodesConn: maxNode,
|
|
}
|
|
logger.Info().Msg("handle to partner heartbeat protocol...")
|
|
service.Host.SetStreamHandler(ProtocolHeartbeatPartner, service.HandlePartnerHeartbeat)
|
|
logger.Info().Msg("connect to partners...")
|
|
service.connectToPartners() // we set up a stream
|
|
go service.StartGC(30 * time.Second)
|
|
return service, nil
|
|
}
|
|
|
|
func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
|
|
pid, hb, err := common.CheckHeartbeat(s.Host, stream, s.maxNodesConn)
|
|
if err != nil {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
streams := s.Streams[ProtocolHeartbeatPartner]
|
|
if streams == nil {
|
|
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
|
return
|
|
}
|
|
// if record already seen update last seen
|
|
if rec, ok := streams[*pid]; ok {
|
|
rec.DID = hb.DID
|
|
rec.Expiry = time.Now().UTC().Add(2 * time.Minute)
|
|
} else { // if not in stream ?
|
|
pid := stream.Conn().RemotePeer()
|
|
ai, err := pp.AddrInfoFromP2pAddr(stream.Conn().RemoteMultiaddr())
|
|
if err == nil {
|
|
s.ConnectToPartner(pid, ai)
|
|
}
|
|
}
|
|
go s.StartGC(30 * time.Second)
|
|
}
|
|
|
|
func (s *StreamService) connectToPartners() error {
|
|
peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, p := range peers {
|
|
ad, err := pp.AddrInfoFromString(p.StreamAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pid, err := pp.Decode(p.PeerID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
s.ConnectToPartner(pid, ad)
|
|
// heartbeat your partner.
|
|
}
|
|
// TODO if handle... from partner then HeartBeat back
|
|
return nil
|
|
}
|
|
|
|
func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
|
for _, proto := range protocols {
|
|
f := func(ss network.Stream) {
|
|
if s.Streams[proto] == nil {
|
|
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
|
}
|
|
s.Streams[proto][pid] = &common.Stream{
|
|
Stream: ss,
|
|
Expiry: time.Now().Add(2 * time.Minute),
|
|
}
|
|
go s.readLoop(s.Streams[proto][pid])
|
|
}
|
|
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, &f)
|
|
if s.Streams[proto][pid] != nil {
|
|
go s.readLoop(s.Streams[proto][pid]) // reaaaad...
|
|
}
|
|
}
|
|
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner,
|
|
s.Host, s.Streams, []*pp.AddrInfo{ad}, time.Minute)
|
|
}
|
|
|
|
func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) {
|
|
ps := []*peer.Peer{}
|
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
|
peers := access.Search(nil, search, false)
|
|
if len(peers.Data) == 0 {
|
|
return ps, errors.New("no self available")
|
|
}
|
|
for _, p := range peers.Data {
|
|
ps = append(ps, p.(*peer.Peer))
|
|
}
|
|
return ps, nil
|
|
}
|
|
|
|
func (ix *StreamService) Close() {
|
|
for _, s := range ix.Streams {
|
|
for _, ss := range s {
|
|
ss.Stream.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StreamService) StartGC(interval time.Duration) {
|
|
go func() {
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for range t.C {
|
|
s.gc()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (s *StreamService) gc() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
now := time.Now()
|
|
streams := s.Streams[ProtocolHeartbeatPartner]
|
|
if streams == nil {
|
|
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
|
return
|
|
}
|
|
for pid, rec := range streams {
|
|
if now.After(rec.Expiry) {
|
|
for _, sstreams := range s.Streams {
|
|
if sstreams[pid] != nil {
|
|
sstreams[pid].Stream.Close()
|
|
delete(sstreams, pid)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ps *StreamService) readLoop(s *common.Stream) {
|
|
dec := json.NewDecoder(s.Stream)
|
|
for {
|
|
var evt common.Event
|
|
if err := dec.Decode(&evt); err != nil {
|
|
s.Stream.Close()
|
|
return
|
|
}
|
|
ps.handleEvent(evt.Type, &evt)
|
|
}
|
|
}
|
|
|
|
func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters {
|
|
id, err := oclib.GetMySelf()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
filter := map[string][]dbs.Filter{
|
|
"creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource...
|
|
"": {{Operator: dbs.OR.String(), Value: &dbs.Filters{
|
|
Or: map[string][]dbs.Filter{
|
|
"abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
|
|
"abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances
|
|
And: map[string][]dbs.Filter{
|
|
"resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{
|
|
And: map[string][]dbs.Filter{
|
|
"resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}},
|
|
},
|
|
}}},
|
|
},
|
|
}}},
|
|
},
|
|
}}},
|
|
}
|
|
if search != "" {
|
|
filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{
|
|
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
|
|
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
|
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
|
|
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
|
|
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
|
|
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
|
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
|
|
},
|
|
}}}
|
|
}
|
|
return &dbs.Filters{
|
|
And: filter,
|
|
}
|
|
}
|