Files
oc-discovery/daemons/node/stream/service.go
2026-02-02 09:05:58 +01:00

232 lines
6.8 KiB
Go

package stream
import (
"context"
"encoding/json"
"errors"
"fmt"
"oc-discovery/daemons/node/common"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/peer"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const ProtocolSearchResource = "/opencloud/resource/search/1.0"
const ProtocolCreateResource = "/opencloud/resource/create/1.0"
const ProtocolUpdateResource = "/opencloud/resource/update/1.0"
const ProtocolDeleteResource = "/opencloud/resource/delete/1.0"
const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0"
var protocols = []protocol.ID{
ProtocolSearchResource,
ProtocolCreateResource,
ProtocolUpdateResource,
ProtocolDeleteResource,
}
type StreamService struct {
Key pp.ID
Host host.Host
Node common.DiscoveryPeer
Streams common.ProtocolStream
maxNodesConn int
mu sync.Mutex
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
}
func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) {
logger := oclib.GetLogger()
service := &StreamService{
Key: key,
Node: node,
Host: h,
Streams: common.ProtocolStream{},
maxNodesConn: maxNode,
}
logger.Info().Msg("handle to partner heartbeat protocol...")
service.Host.SetStreamHandler(ProtocolHeartbeatPartner, service.HandlePartnerHeartbeat)
logger.Info().Msg("connect to partners...")
service.connectToPartners() // we set up a stream
go service.StartGC(30 * time.Second)
return service, nil
}
func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
pid, hb, err := common.CheckHeartbeat(s.Host, stream, s.maxNodesConn)
if err != nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
streams := s.Streams[ProtocolHeartbeatPartner]
if streams == nil {
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
return
}
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Expiry = time.Now().UTC().Add(2 * time.Minute)
} else { // if not in stream ?
pid := stream.Conn().RemotePeer()
ai, err := pp.AddrInfoFromP2pAddr(stream.Conn().RemoteMultiaddr())
if err == nil {
s.connectToPartner(pid, ai)
}
}
go s.StartGC(30 * time.Second)
}
func (s *StreamService) connectToPartners() error {
peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex()))
if err != nil {
return err
}
for _, p := range peers {
ad, err := pp.AddrInfoFromString(p.StreamAddress)
if err != nil {
return err
}
pid, err := pp.Decode(p.PeerID)
if err != nil {
continue
}
s.connectToPartner(pid, ad)
// heartbeat your partner.
}
// TODO if handle... from partner then HeartBeat back
return nil
}
func (s *StreamService) connectToPartner(pid pp.ID, ad *pp.AddrInfo) {
for _, proto := range protocols {
f := func(ss network.Stream) {
if s.Streams[proto] == nil {
s.Streams[proto] = map[pp.ID]*common.Stream{}
}
s.Streams[proto][pid] = &common.Stream{
Stream: ss,
Expiry: time.Now().Add(2 * time.Minute),
}
go s.readLoop(s.Streams[proto][pid])
}
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, &f)
if s.Streams[proto][pid] != nil {
go s.readLoop(s.Streams[proto][pid]) // reaaaad...
}
}
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner,
s.Host, s.Streams, []*pp.AddrInfo{ad}, time.Minute)
}
func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) {
ps := []*peer.Peer{}
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
peers := access.Search(nil, search, false)
if len(peers.Data) == 0 {
return ps, errors.New("no self available")
}
for _, p := range peers.Data {
ps = append(ps, p.(*peer.Peer))
}
return ps, nil
}
func (ix *StreamService) Close() {
for _, s := range ix.Streams {
for _, ss := range s {
ss.Stream.Close()
}
}
}
func (s *StreamService) StartGC(interval time.Duration) {
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for range t.C {
s.gc()
}
}()
}
func (s *StreamService) gc() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
streams := s.Streams[ProtocolHeartbeatPartner]
if streams == nil {
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
return
}
for pid, rec := range streams {
if now.After(rec.Expiry) {
for _, sstreams := range s.Streams {
if sstreams[pid] != nil {
sstreams[pid].Stream.Close()
delete(sstreams, pid)
}
}
}
}
}
func (ps *StreamService) readLoop(s *common.Stream) {
dec := json.NewDecoder(s.Stream)
for {
var evt common.Event
if err := dec.Decode(&evt); err != nil {
s.Stream.Close()
return
}
ps.handleEvent(evt.Type, &evt)
}
}
func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters {
id, err := oclib.GetMySelf()
if err != nil {
return nil
}
filter := map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource...
"": {{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
"abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances
And: map[string][]dbs.Filter{
"resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{
And: map[string][]dbs.Filter{
"resourcepartnership.peer_groups." + peerID: {{Operator: dbs.EXISTS.String(), Value: true}},
},
}}},
},
}}},
},
}}},
}
if search != "" {
filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
},
}}}
}
return &dbs.Filters{
And: filter,
}
}