248 lines
7.0 KiB
Go
248 lines
7.0 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"oc-discovery/conf"
|
|
"oc-discovery/daemons/node/common"
|
|
"oc-discovery/daemons/node/indexer"
|
|
"oc-discovery/daemons/node/pubsub"
|
|
"oc-discovery/daemons/node/stream"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
|
"github.com/libp2p/go-libp2p"
|
|
pubsubs "github.com/libp2p/go-libp2p-pubsub"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
type Node struct {
|
|
*common.LongLivedStreamRecordedService[interface{}] // change type of stream
|
|
PS *pubsubs.PubSub
|
|
IndexerService *indexer.IndexerService
|
|
PubSubService *pubsub.PubSubService
|
|
StreamService *stream.StreamService
|
|
PeerID pp.ID
|
|
isIndexer bool
|
|
}
|
|
|
|
func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
|
if !isNode && !isIndexer {
|
|
return nil, errors.New("wait... what ? your node need to at least something. Retry we can't be friend in that case")
|
|
}
|
|
logger := oclib.GetLogger()
|
|
logger.Info().Msg("retrieving private key...")
|
|
priv, err := common.LoadKeyFromFilePrivate() // your node private key
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
logger.Info().Msg("retrieving psk file...")
|
|
psk, err := common.LoadPSKFromFile() // network common private Network. Public OC PSK is Public Network
|
|
if err != nil {
|
|
return nil, nil
|
|
}
|
|
logger.Info().Msg("open a host...")
|
|
h, err := libp2p.New(
|
|
libp2p.PrivateNetwork(psk),
|
|
libp2p.Identity(priv),
|
|
libp2p.ListenAddrStrings(
|
|
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().NodeEndpointPort),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return nil, errors.New("no host no node")
|
|
}
|
|
node := &Node{
|
|
PeerID: h.ID(),
|
|
isIndexer: isIndexer,
|
|
LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000, false),
|
|
}
|
|
var ps *pubsubs.PubSub
|
|
if isNode {
|
|
logger.Info().Msg("generate opencloud node...")
|
|
ps, err = pubsubs.NewGossipSub(context.Background(), node.Host)
|
|
if err != nil {
|
|
panic(err) // can't run your node without a propalgation pubsub, of state of node.
|
|
}
|
|
node.PS = ps
|
|
logger.Info().Msg("connect to indexers...")
|
|
common.ConnectToIndexers(node.Host, 0, 5, node.PeerID) // TODO : make var to change how many indexers are allowed.
|
|
logger.Info().Msg("claims my node...")
|
|
node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname)
|
|
logger.Info().Msg("subscribe to node activity...")
|
|
node.SubscribeToNodeActivity(node.PS) // now we subscribe to a long run topic named node-activity, to relay message.
|
|
logger.Info().Msg("subscribe to decentralized search flow...")
|
|
node.SubscribeToSearch(node.PS)
|
|
logger.Info().Msg("run garbage collector...")
|
|
node.StartGC(30 * time.Second)
|
|
|
|
if node.StreamService, err = stream.InitStream(context.Background(), node.Host, node.PeerID, 1000, node); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if node.PubSubService, err = pubsub.InitPubSub(context.Background(), node.Host, node.PS, node, node.StreamService); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
if isIndexer {
|
|
logger.Info().Msg("generate opencloud indexer...")
|
|
node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5)
|
|
}
|
|
logger.Info().Msg("connect to NATS")
|
|
ListenNATS(*node)
|
|
logger.Info().Msg("Node is actually running.")
|
|
return node, nil
|
|
}
|
|
|
|
func (d *Node) Close() {
|
|
if d.isIndexer {
|
|
d.IndexerService.Close()
|
|
}
|
|
d.PubSubService.Close()
|
|
d.StreamService.Close()
|
|
d.Host.Close()
|
|
}
|
|
|
|
func (d *Node) publishPeerRecord(
|
|
rec *indexer.PeerRecord,
|
|
) error {
|
|
priv, err := common.LoadKeyFromFilePrivate() // your node private key
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if common.StreamIndexers[common.ProtocolPublish] == nil {
|
|
return errors.New("no protocol Publish is set up on the node")
|
|
}
|
|
for _, ad := range common.StaticIndexers {
|
|
if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil {
|
|
return errors.New("no protocol Publish for peer " + ad.ID.String() + " is set up on the node")
|
|
}
|
|
stream := common.StreamIndexers[common.ProtocolPublish][ad.ID]
|
|
base := indexer.PeerRecord{
|
|
Name: rec.Name,
|
|
DID: rec.DID,
|
|
PubKey: rec.PubKey,
|
|
ExpiryDate: time.Now().UTC().Add(2 * time.Minute),
|
|
}
|
|
payload, _ := json.Marshal(base)
|
|
hash := sha256.Sum256(payload)
|
|
|
|
rec.ExpiryDate = base.ExpiryDate
|
|
rec.Signature, err = priv.Sign(hash[:])
|
|
rec.TTL = 2
|
|
if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Node) GetPeerRecord(
|
|
ctx context.Context,
|
|
key string,
|
|
) (*peer.Peer, error) {
|
|
var err error
|
|
var info *indexer.PeerRecord
|
|
if common.StreamIndexers[common.ProtocolPublish] == nil {
|
|
return nil, errors.New("no protocol Publish is set up on the node")
|
|
}
|
|
for _, ad := range common.StaticIndexers {
|
|
if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil {
|
|
return nil, errors.New("no protocol Publish for peer " + ad.ID.String() + " is set up on the node")
|
|
}
|
|
stream := common.StreamIndexers[common.ProtocolPublish][ad.ID]
|
|
if err := json.NewEncoder(stream.Stream).Encode(indexer.GetValue{Key: key}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var resp indexer.GetResponse
|
|
if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.Found {
|
|
info = &resp.Record
|
|
break
|
|
}
|
|
}
|
|
var p *peer.Peer
|
|
if info != nil {
|
|
if pk, err := info.Verify(); err != nil {
|
|
return nil, err
|
|
} else if ok, p, err := info.ExtractPeer(d.PeerID.String(), key, pk); err != nil {
|
|
return nil, err
|
|
} else {
|
|
if ok {
|
|
d.publishPeerRecord(info)
|
|
}
|
|
return p, nil
|
|
}
|
|
}
|
|
return p, err
|
|
}
|
|
|
|
func (d *Node) claimInfo(
|
|
name string,
|
|
endPoint string, // TODO : endpoint is not necesserry StreamAddress
|
|
) (*peer.Peer, error) {
|
|
if endPoint == "" {
|
|
return nil, errors.New("no endpoint found for peer")
|
|
}
|
|
|
|
peerID, err := oclib.GenerateNodeID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
priv, err := common.LoadKeyFromFilePrivate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pub, err := common.LoadKeyFromFilePublic()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pubBytes, _ := pub.Raw()
|
|
|
|
now := time.Now()
|
|
expiry := now.Add(150 * time.Second)
|
|
|
|
rec := &indexer.PeerRecord{
|
|
Name: name,
|
|
DID: peerID, // REAL PEER ID
|
|
PubKey: pubBytes,
|
|
}
|
|
|
|
rec.PeerID = d.Host.ID().String()
|
|
d.PeerID = d.Host.ID()
|
|
|
|
payload, _ := json.Marshal(rec)
|
|
hash := sha256.Sum256(payload)
|
|
|
|
rec.Signature, err = priv.Sign(hash[:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rec.APIUrl = endPoint
|
|
rec.StreamAddress = "/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + rec.PeerID
|
|
rec.NATSAddress = oclib.GetConfig().NATSUrl
|
|
rec.WalletAddress = "my-wallet"
|
|
rec.ExpiryDate = expiry
|
|
|
|
if err := d.publishPeerRecord(rec); err != nil {
|
|
return nil, err
|
|
}
|
|
if pk, err := rec.Verify(); err != nil {
|
|
return nil, err
|
|
} else {
|
|
_, p, err := rec.ExtractPeer(peerID, peerID, pk)
|
|
return p, err
|
|
}
|
|
}
|