diff --git a/conf/config.go b/conf/config.go index a5c22f4..875f1da 100644 --- a/conf/config.go +++ b/conf/config.go @@ -11,6 +11,8 @@ type Config struct { NodeEndpointPort int64 IndexerAddresses string + PeerIDS string // TO REMOVE + NodeMode string } diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index a1ffb3e..e060a23 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -35,7 +35,7 @@ func NewEvent(name string, from string, dt *tools.DataType, user string, payload Type: name, From: from, User: user, - Timestamp: time.Now().Unix(), + Timestamp: time.Now().UTC().Unix(), Payload: payload, } if dt != nil { diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index 3e8b5ca..a96f1c5 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -52,17 +52,17 @@ func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) { func (ix *LongLivedStreamRecordedService[T]) gc() { ix.StreamMU.Lock() defer ix.StreamMU.Unlock() - now := time.Now() - streams := ix.StreamRecords[ProtocolHeartbeat] - if streams == nil { + now := time.Now().UTC() + if ix.StreamRecords[ProtocolHeartbeat] == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} return } + streams := ix.StreamRecords[ProtocolHeartbeat] + for pid, rec := range streams { if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) { for _, sstreams := range ix.StreamRecords { if sstreams[pid] != nil { - sstreams[pid].Stream.Close() delete(sstreams, pid) } } @@ -111,40 +111,49 @@ func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] { } func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) { - streams := ix.StreamRecords[ProtocolHeartbeat] - pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn) - if err != nil { - return + defer s.Close() + for { + pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn) + if err != nil { + continue + } + ix.StreamMU.Lock() + if ix.StreamRecords[ProtocolHeartbeat] == nil { + ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} + } + streams := ix.StreamRecords[ProtocolHeartbeat] + // if record already seen update last seen + if rec, ok := streams[*pid]; ok { + rec.DID = hb.DID + rec.Stream = s + rec.HeartbeatStream = hb.Stream + rec.LastSeen = time.Now().UTC() + } else { + streams[*pid] = &StreamRecord[T]{ + DID: hb.DID, + HeartbeatStream: hb.Stream, + Stream: s, + LastSeen: time.Now().UTC(), + } + } + ix.StreamMU.Unlock() } - ix.StreamMU.Lock() - defer ix.StreamMU.Unlock() - if streams == nil { - ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} - return - } - // if record already seen update last seen - if rec, ok := streams[*pid]; ok { - rec.DID = hb.DID - rec.Stream = s - rec.HeartbeatStream = hb.Stream - rec.LastSeen = time.Unix(hb.Timestamp, 0) - } - - // si je l'handle et que je ne suis pas dans une } func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heartbeat, error) { if len(h.Network().Peers()) >= maxNodes { return nil, nil, fmt.Errorf("too many connections, try another indexer") } - defer s.Close() - var hb Heartbeat if err := json.NewDecoder(s).Decode(&hb); err != nil { return nil, nil, err } pid, err := pp.Decode(hb.PeerID) - hb.Stream.Stream = s // here is the long-lived bidirectionnal heart bit. + hb.Stream = &Stream{ + DID: hb.DID, + Stream: s, + Expiry: time.Now().UTC().Add(2 * time.Minute), + } // here is the long-lived bidirectionnal heart bit. return &pid, &hb, err } @@ -242,7 +251,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) StaticIndexers = append(StaticIndexers, ad) // make a privilege streams with indexer. for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} { - AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, nil) + AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, true, nil) } } if len(StaticIndexers) == 0 { @@ -252,21 +261,21 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) if len(StaticIndexers) < minIndexer { // TODO : ask for unknown indexer. } - SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20) // your indexer is just like a node for the next indexer. + SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer. } -func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, onStreamCreated *func(network.Stream)) ProtocolStream { +func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream { if onStreamCreated == nil { f := func(s network.Stream) { protoS[proto][id] = &Stream{ Stream: s, - Expiry: time.Now().Add(2 * time.Minute), + Expiry: time.Now().UTC().Add(2 * time.Minute), } } onStreamCreated = &f } f := *onStreamCreated - if mypid > id { + if mypid > id || force { if ctx == nil { c := context.Background() ctx = &c @@ -278,16 +287,14 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, if protoS[proto][id] != nil { protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) } else { + fmt.Println("GENERATE STREAM", proto, id) s, err := h.NewStream(*ctx, id, proto) if err != nil { panic(err.Error()) } f(s) } - } else { - h.SetStreamHandler(proto, f) } - return protoS } @@ -318,7 +325,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps Proto hb := Heartbeat{ DID: peerID, PeerID: h.ID().String(), - Timestamp: time.Now().Unix(), + Timestamp: time.Now().UTC().Unix(), } for _, ix := range peers { _ = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second) @@ -330,7 +337,8 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps Proto }() } -func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) error { +func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, + hb Heartbeat, ps ProtocolStream, interval time.Duration) error { streams := ps.Get(proto) if len(streams) == 0 { return errors.New("no stream for protocol heartbeat founded") diff --git a/daemons/node/common/crypto.go b/daemons/node/common/crypto.go index 4da2c6f..854ca0d 100644 --- a/daemons/node/common/crypto.go +++ b/daemons/node/common/crypto.go @@ -49,14 +49,11 @@ func Verify(pub crypto.PubKey, data, sig []byte) (bool, error) { func LoadKeyFromFilePrivate() (crypto.PrivKey, error) { path := conf.GetConfig().PrivateKeyPath - fmt.Println("extract " + path) data, err := os.ReadFile(path) if err != nil { return nil, err } - fmt.Println(data) block, _ := pem.Decode(data) - fmt.Println(block.Bytes) keyAny, err := x509.ParsePKCS8PrivateKey(block.Bytes) if err != nil { return nil, err @@ -71,7 +68,6 @@ func LoadKeyFromFilePrivate() (crypto.PrivKey, error) { func LoadKeyFromFilePublic() (crypto.PubKey, error) { path := conf.GetConfig().PublicKeyPath - fmt.Println("extract " + path) data, err := os.ReadFile(path) if err != nil { return nil, err @@ -100,7 +96,6 @@ func LoadPSKFromFile() (pnet.PSK, error) { if err != nil { return nil, err } - fmt.Println("PSK found.") return psk, nil } diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index b7f6dea..78490ca 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -4,9 +4,11 @@ import ( "encoding/base64" "encoding/json" "errors" + "fmt" "oc-discovery/daemons/node/common" "time" + oclib "cloud.o-forge.io/core/oc-lib" pp "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" @@ -48,8 +50,10 @@ func (p *PeerRecord) Sign() error { } func (p *PeerRecord) Verify() (crypto.PubKey, error) { + fmt.Println(p.PubKey) pubKey, err := crypto.UnmarshalPublicKey(p.PubKey) // retrieve pub key in message if err != nil { + fmt.Println("UnmarshalPublicKey") return pubKey, err } dht := PeerRecord{ @@ -61,14 +65,18 @@ func (p *PeerRecord) Verify() (crypto.PubKey, error) { payload, _ := json.Marshal(dht) if ok, _ := common.Verify(pubKey, payload, p.Signature); !ok { // verify minimal message was sign per pubKey + fmt.Println("Verify") return pubKey, errors.New("invalid signature") } return pubKey, nil } func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKey) (bool, *pp.Peer, error) { - pubBytes, _ := pubKey.Raw() - + pubBytes, err := crypto.MarshalPublicKey(pubKey) + if err != nil { + return false, nil, err + } + fmt.Println("ExtractPeer MarshalPublicKey") rel := pp.NONE if ourkey == key { // at this point is PeerID is same as our... we are... thats our peer INFO rel = pp.SELF @@ -88,14 +96,14 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe NATSAddress: pr.NATSAddress, WalletAddress: pr.WalletAddress, } - if time.Now().After(pr.ExpiryDate) { // is expired + if time.Now().UTC().After(pr.ExpiryDate) { // is expired p.State = pp.OFFLINE // then is considers OFFLINE } b, err := json.Marshal(p) if err != nil { return pp.SELF == p.Relation, nil, err } - tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.CREATE_PEER), @@ -124,108 +132,135 @@ func (ix *IndexerService) initNodeHandler() { func (ix *IndexerService) handleNodePublish(s network.Stream) { defer s.Close() - - var rec PeerRecord - if err := json.NewDecoder(s).Decode(&rec); err != nil { - return - } - if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now()) { // already expired - return - } - pid, err := peer.Decode(rec.PeerID) - if err != nil { - return - } - - ix.StreamMU.Lock() - defer ix.StreamMU.Unlock() - - streams := ix.StreamRecords[common.ProtocolPublish] - if streams == nil { - ix.StreamRecords[common.ProtocolPublish] = map[peer.ID]*common.StreamRecord[PeerRecord]{} - return - } - - if srec, ok := streams[pid]; ok { - srec.DID = rec.DID - srec.Record = rec - srec.LastSeen = time.Now() - } else { - streams[pid] = &common.StreamRecord[PeerRecord]{ // HeartBeat wil - DID: rec.DID, - Record: rec, - LastSeen: time.Now(), + logger := oclib.GetLogger() + for { + var rec PeerRecord + if err := json.NewDecoder(s).Decode(&rec); err != nil { + continue + } + rec2 := PeerRecord{ + Name: rec.Name, + DID: rec.DID, // REAL PEER ID + PubKey: rec.PubKey, + PeerID: rec.PeerID, + } + if _, err := rec2.Verify(); err != nil { + logger.Err(err) + continue + } + if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now().UTC()) { // already expired + logger.Warn().Msg(rec.PeerID + " is expired.") + continue + } + pid, err := peer.Decode(rec.PeerID) + if err != nil { + continue } - } - if rec.TTL > 0 { - for _, ad := range common.StaticIndexers { - if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { - continue - } - stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] - rec.TTL -= 1 - if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream - continue + ix.StreamMU.Lock() + + if ix.StreamRecords[common.ProtocolPublish] == nil { + ix.StreamRecords[common.ProtocolPublish] = map[peer.ID]*common.StreamRecord[PeerRecord]{} + } + streams := ix.StreamRecords[common.ProtocolPublish] + + if srec, ok := streams[pid]; ok { + fmt.Println("UPDATE PUBLISH", pid) + srec.DID = rec.DID + srec.Record = rec + srec.LastSeen = time.Now().UTC() + } else { + fmt.Println("CREATE PUBLISH", pid) + streams[pid] = &common.StreamRecord[PeerRecord]{ // HeartBeat wil + DID: rec.DID, + Record: rec, + LastSeen: time.Now().UTC(), } } + + if rec.TTL > 0 { + for _, ad := range common.StaticIndexers { + if ad.ID == s.Conn().RemotePeer() { + continue + } + if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { + continue + } + stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] + rec.TTL -= 1 + if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream + continue + } + } + } + ix.StreamMU.Unlock() } } func (ix *IndexerService) handleNodeGet(s network.Stream) { defer s.Close() + logger := oclib.GetLogger() + for { + var req GetValue + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err) + continue + } + ix.StreamMU.Lock() - var req GetValue - if err := json.NewDecoder(s).Decode(&req); err != nil { - return - } - ix.StreamMU.Lock() - defer ix.StreamMU.Unlock() - - streams := ix.StreamRecords[common.ProtocolGet] - if streams == nil { - ix.StreamRecords[common.ProtocolGet] = map[peer.ID]*common.StreamRecord[PeerRecord]{} - return - } - - // simple lookup by PeerID (or DID) - for _, rec := range streams { - if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // OK - resp := GetResponse{ - Found: true, - Record: rec.Record, + if ix.StreamRecords[common.ProtocolGet] == nil { + ix.StreamRecords[common.ProtocolGet] = map[peer.ID]*common.StreamRecord[PeerRecord]{} + } + streams := ix.StreamRecords[common.ProtocolPublish] + // simple lookup by PeerID (or DID) + for _, rec := range streams { + if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // OK + resp := GetResponse{ + Found: true, + Record: rec.Record, + } + _ = json.NewEncoder(s).Encode(resp) + break } - _ = json.NewEncoder(s).Encode(resp) - return } - } - // if not found ask to my neighboor indexers - if common.StreamIndexers[common.ProtocolPublish] == nil { + // if not found ask to my neighboor indexers + if common.StreamIndexers[common.ProtocolGet] == nil { + _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) + ix.StreamMU.Unlock() + continue + } + for _, ad := range common.StaticIndexers { + if ad.ID == s.Conn().RemotePeer() { + continue + } + if common.StreamIndexers[common.ProtocolGet][ad.ID] == nil { + continue + } + stream := common.StreamIndexers[common.ProtocolGet][ad.ID] + if err := json.NewEncoder(stream.Stream).Encode(GetValue{Key: req.Key}); err != nil { + continue + } + + var resp GetResponse + if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { + continue + } + if resp.Found { + for _, rec := range streams { + if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // OK + resp := GetResponse{ + Found: true, + Record: rec.Record, + } + _ = json.NewEncoder(s).Encode(resp) + break + } + } + continue + } + } + // Not found _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) - return + ix.StreamMU.Unlock() } - for _, ad := range common.StaticIndexers { - if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil { - continue - } - stream := common.StreamIndexers[common.ProtocolPublish][ad.ID] - if err := json.NewEncoder(stream.Stream).Encode(GetValue{Key: req.Key}); err != nil { - continue - } - - var resp GetResponse - if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { - continue - } - if resp.Found { - _ = json.NewEncoder(s).Encode(GetResponse{ - Found: true, - Record: resp.Record, - }) - return - } - } - - // Not found - _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) } diff --git a/daemons/node/node.go b/daemons/node/node.go index 9abd692..615fc75 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -15,8 +15,10 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/peer" + "github.com/google/uuid" "github.com/libp2p/go-libp2p" pubsubs "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/crypto" pp "github.com/libp2p/go-libp2p/core/peer" ) @@ -53,6 +55,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().NodeEndpointPort), ), ) + logger.Info().Msg("Host open on " + h.ID().String()) if err != nil { return nil, errors.New("no host no node") } @@ -72,7 +75,9 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { logger.Info().Msg("connect to indexers...") common.ConnectToIndexers(node.Host, 0, 5, node.PeerID) // TODO : make var to change how many indexers are allowed. logger.Info().Msg("claims my node...") - node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname) + if _, err := node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname); err != nil { + panic(err) + } logger.Info().Msg("subscribe to decentralized search flow...") node.SubscribeToSearch(node.PS) logger.Info().Msg("run garbage collector...") @@ -189,25 +194,21 @@ func (d *Node) claimInfo( if endPoint == "" { return nil, errors.New("no endpoint found for peer") } - - peerID, err := oclib.GenerateNodeID() - if err != nil { - return nil, err - } - + peerID := uuid.New().String() priv, err := common.LoadKeyFromFilePrivate() if err != nil { return nil, err } - pub, err := common.LoadKeyFromFilePublic() if err != nil { return nil, err } + pubBytes, err := crypto.MarshalPublicKey(pub) + if err != nil { + return nil, err + } - pubBytes, _ := pub.Raw() - - now := time.Now() + now := time.Now().UTC() expiry := now.Add(150 * time.Second) rec := &indexer.PeerRecord{ @@ -236,10 +237,11 @@ func (d *Node) claimInfo( if err := d.publishPeerRecord(rec); err != nil { return nil, err } - if pk, err := rec.Verify(); err != nil { + /*if pk, err := rec.Verify(); err != nil { + fmt.Println("Verify") return nil, err - } else { - _, p, err := rec.ExtractPeer(peerID, peerID, pk) - return p, err - } + } else {*/ + _, p, err := rec.ExtractPeer(peerID, peerID, pub) + return p, err + //} } diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index c9cd0f4..852d918 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -40,7 +40,7 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { // return nil } b, err := json.Marshal(res.Serialize(res)) - tools.NewNATSCaller().SetNATSPub(tools.CATALOG_SEARCH_EVENT, tools.NATSResponse{ + go tools.NewNATSCaller().SetNATSPub(tools.CATALOG_SEARCH_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(event.DataType), Method: int(tools.CATALOG_SEARCH_EVENT), @@ -74,14 +74,14 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools. } case tools.PB_CREATE: case tools.PB_UPDATE: - tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(evt.DataType), Method: int(tools.CREATE_RESOURCE), Payload: b, }) case tools.PB_DELETE: - tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{ + go tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(evt.DataType), Method: int(tools.REMOVE_RESOURCE), diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 0929af3..9550d4c 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -3,15 +3,18 @@ package stream import ( "context" "encoding/json" - "errors" "fmt" + "oc-discovery/conf" "oc-discovery/daemons/node/common" + "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/utils" + "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" @@ -66,11 +69,11 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) { } s.mu.Lock() defer s.mu.Unlock() - streams := s.Streams[ProtocolHeartbeatPartner] - if streams == nil { + + if s.Streams[ProtocolHeartbeatPartner] == nil { s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} - return } + streams := s.Streams[ProtocolHeartbeatPartner] // if record already seen update last seen if rec, ok := streams[*pid]; ok { rec.DID = hb.DID @@ -93,7 +96,7 @@ func (s *StreamService) connectToPartners() error { for _, p := range peers { ad, err := pp.AddrInfoFromString(p.StreamAddress) if err != nil { - return err + continue } pid, err := pp.Decode(p.PeerID) if err != nil { @@ -102,11 +105,26 @@ func (s *StreamService) connectToPartners() error { s.ConnectToPartner(pid, ad) // heartbeat your partner. } + for _, proto := range protocols { + f := func(ss network.Stream) { + if s.Streams[proto] == nil { + s.Streams[proto] = map[pp.ID]*common.Stream{} + } + s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{ + Stream: ss, + Expiry: time.Now().UTC().Add(2 * time.Minute), + } + s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()]) + } + fmt.Println("SetStreamHandler", proto) + s.Host.SetStreamHandler(proto, f) + } // TODO if handle... from partner then HeartBeat back return nil } func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) { + logger := oclib.GetLogger() for _, proto := range protocols { f := func(ss network.Stream) { if s.Streams[proto] == nil { @@ -114,26 +132,43 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) { } s.Streams[proto][pid] = &common.Stream{ Stream: ss, - Expiry: time.Now().Add(2 * time.Minute), + Expiry: time.Now().UTC().Add(2 * time.Minute), } - go s.readLoop(s.Streams[proto][pid]) + s.readLoop(s.Streams[proto][pid]) } - s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, &f) - if s.Streams[proto][pid] != nil { - go s.readLoop(s.Streams[proto][pid]) // reaaaad... + if s.Host.Network().Connectedness(ad.ID) != network.Connected { + if err := s.Host.Connect(context.Background(), *ad); err != nil { + logger.Err(err) + continue + } } + s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, false, &f) } common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, - s.Host, s.Streams, []*pp.AddrInfo{ad}, time.Minute) + s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second) } func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) { + /* TODO FOR TEST ONLY A VARS THAT DEFINE ADDRESS... deserialize */ ps := []*peer.Peer{} + if conf.GetConfig().PeerIDS != "" { + for _, peerID := range strings.Split(conf.GetConfig().PeerIDS, ",") { + ppID := strings.Split(peerID, ":") + ps = append(ps, &peer.Peer{ + AbstractObject: utils.AbstractObject{ + UUID: uuid.New().String(), + Name: ppID[1], + }, + PeerID: ppID[1], + StreamAddress: "/ip4/127.0.0.1/tcp/" + ppID[0] + "/p2p/" + ppID[1], + State: peer.ONLINE, + Relation: peer.PARTNER, + }) + } + } + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) peers := access.Search(nil, search, false) - if len(peers.Data) == 0 { - return ps, errors.New("no self available") - } for _, p := range peers.Data { ps = append(ps, p.(*peer.Peer)) } @@ -161,12 +196,12 @@ func (s *StreamService) StartGC(interval time.Duration) { func (s *StreamService) gc() { s.mu.Lock() defer s.mu.Unlock() - now := time.Now() - streams := s.Streams[ProtocolHeartbeatPartner] - if streams == nil { + now := time.Now().UTC() + + if s.Streams[ProtocolHeartbeatPartner] == nil { s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} - return } + streams := s.Streams[ProtocolHeartbeatPartner] for pid, rec := range streams { if now.After(rec.Expiry) { for _, sstreams := range s.Streams { @@ -180,12 +215,12 @@ func (s *StreamService) gc() { } func (ps *StreamService) readLoop(s *common.Stream) { - dec := json.NewDecoder(s.Stream) + defer s.Stream.Close() for { var evt common.Event - if err := dec.Decode(&evt); err != nil { + if err := json.NewDecoder(s.Stream).Decode(&evt); err != nil { s.Stream.Close() - return + continue } ps.handleEvent(evt.Type, &evt) } diff --git a/main.go b/main.go index 20901d2..69cf07d 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log" "oc-discovery/conf" "oc-discovery/daemons/node" @@ -25,13 +26,15 @@ func main() { o := oclib.GetConfLoader() conf.GetConfig().Name = o.GetStringDefault("NAME", "opencloud-demo") - conf.GetConfig().Hostname = o.GetStringDefault("HOSTNAME", "localhost") + conf.GetConfig().Hostname = o.GetStringDefault("HOSTNAME", "127.0.0.1") conf.GetConfig().PSKPath = o.GetStringDefault("PSK_PATH", "./psk/psk.key") conf.GetConfig().NodeEndpointPort = o.GetInt64Default("NODE_ENDPOINT_PORT", 4001) conf.GetConfig().PublicKeyPath = o.GetStringDefault("PUBLIC_KEY_PATH", "./pem/public.pem") conf.GetConfig().PrivateKeyPath = o.GetStringDefault("PRIVATE_KEY_PATH", "./pem/private.pem") conf.GetConfig().IndexerAddresses = o.GetStringDefault("INDEXER_ADDRESSES", "") + conf.GetConfig().PeerIDS = o.GetStringDefault("PEER_IDS", "") + conf.GetConfig().NodeMode = o.GetStringDefault("NODE_MODE", "node") // Normal beego init @@ -53,7 +56,7 @@ func main() { syscall.SIGTERM, ) defer stop() - + fmt.Println(conf.GetConfig().NodeMode) isNode := strings.Contains(conf.GetConfig().NodeMode, "node") isIndexer := strings.Contains(conf.GetConfig().NodeMode, "indexer") diff --git a/models/event.go b/models/event.go index dbdc2a6..43bae00 100644 --- a/models/event.go +++ b/models/event.go @@ -25,7 +25,7 @@ func NewEvent(name string, from string, dt *tools.DataType, user string, payload Type: name, From: from, User: user, - Timestamp: time.Now().Unix(), + Timestamp: time.Now().UTC().Unix(), Payload: payload, } if dt != nil { diff --git a/oc-k8s b/oc-k8s new file mode 100755 index 0000000..023738e Binary files /dev/null and b/oc-k8s differ