package dht import ( "context" "encoding/base64" "encoding/json" "errors" "fmt" "oc-discovery/conf" "oc-discovery/daemons" "slices" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" pp "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" kad_dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" ) type DHTRecord struct { Name string `json:"name"` State int `json:"state"` DID string `json:"did"` PeerID string `json:"peer_id"` PubKey []byte `json:"pub_key"` URL string `json:"url"` NATSUrl string `json:"nats_url"` Wallet string `json:"wallet"` Signature []byte `json:"signature"` ExpiryDate time.Time `json:"expiry_date"` } type DHTService struct { Key string Host host.Host DHT *dht.IpfsDHT Cache []string mutex sync.RWMutex } // TODO kick connection to base... and send on NATS boy var dhtSingletonService *DHTService func GetDHTService() *DHTService { return dhtSingletonService } func Init(ctx context.Context) (*DHTService, error) { service := &DHTService{} priv, err := daemons.LoadKeyFromFile(false) if err != nil { return nil, err } psk, err := daemons.LoadPSKFromFile() if err != nil { return nil, err } h, err := libp2p.New( libp2p.PrivateNetwork(psk), libp2p.Identity(priv), libp2p.ListenAddrStrings( fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().DHTEndpointPort), ), ) if err != nil { return nil, err } service.Host = h service.DHT, err = kad_dht.New(ctx, h, kad_dht.MaxRecordAge(24*time.Hour)) // every day DHT will purge expired data... if not used. if err != nil { return nil, err } err = service.DHT.Bootstrap(ctx) if err != nil { return nil, err } for _, address := range strings.Split(conf.GetConfig().BootstrapAddresses, ",") { pi, err := peer.AddrInfoFromString(address) if err != nil { return nil, err } logger := oclib.GetLogger() if err := h.Connect(ctx, *pi); err != nil { logger.Err(fmt.Errorf("Failed to connect to MAIN bootstrap peer %s: %s", pi.ID, err)) } else { logger.Info().Msg(fmt.Sprintf("Connected to MAIN bootstrap peer %s", pi.ID)) } } dhtSingletonService = service if daemons.VerifyPubWithPriv() { if _, err := dhtSingletonService.ClaimName(context.Background(), conf.GetConfig().Name, conf.GetConfig().Hostname, false); err == nil { go service.Heartbeat(ctx, 2*time.Minute) go service.RefreshKeys(ctx, 30*time.Minute) } } return service, nil } func (d *DHTService) Heartbeat(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: dhtSingletonService.ClaimName(context.Background(), conf.GetConfig().Name, conf.GetConfig().Hostname, true) } } }() } func (d *DHTService) RefreshKeys(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s := []string{} d.mutex.Lock() s = append(s, d.Cache...) d.mutex.Unlock() for _, key := range s { _, _ = d.GetValue(ctx, key) } } } }() } func (d *DHTService) PutValue( ctx context.Context, key string, value []byte, ) error { err := d.DHT.PutValue(ctx, key, value) if err != nil { return err } d.mutex.Lock() if !slices.Contains(d.Cache, key) { d.Cache = append(d.Cache, key) } d.mutex.Unlock() return nil } func (d *DHTService) GetValue( ctx context.Context, key string, ) (*DHTRecord, error) { dht, err := d.DHT.GetValue(ctx, key) if err != nil { cache := []string{} d.mutex.Lock() for _, c := range d.Cache { if c != key { cache = append(cache, c) } } d.Cache = cache d.mutex.Unlock() return nil, err } d.mutex.Lock() if !slices.Contains(d.Cache, key) { d.Cache = append(d.Cache, key) } d.mutex.Unlock() var data DHTRecord json.Unmarshal(dht, &data) peerID, err := oclib.GenerateNodeID() if err != nil { return nil, err } p := &pp.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: data.Name, }, State: pp.ONLINE, Relation: pp.SELF, PeerID: peerID, PublicKey: string(data.PubKey), Url: data.URL, NATSUrl: oclib.GetConfig().NATSUrl, WalletAddress: data.Wallet, } b, err := json.Marshal(p) if err != nil { return nil, err } tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.CREATE_PEER), Payload: b, }) /*if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err == nil && len(founded) > 0 && founded[0].(*pp.Peer).Relation != pp.BLACKLIST { f.(*pp.Peer).State = pp.ONLINE f.(*pp.Peer).NATSUrl = p.NATSUrl f.(*pp.Peer).Url = p.Url f.(*pp.Peer).PeerID = p.PeerID f.(*pp.Peer).Relation = p.Relation f.(*pp.Peer).WalletAddress = p.WalletAddress access.UpdateOne(f, f.GetID()) }*/ return &data, err } func (d *DHTService) generateKey() (string, error) { s, err := oclib.GenerateNodeID() if err != nil { return s, err } return "/opencloud/peer/" + s, nil } // Create your peer. func (d *DHTService) ClaimName( ctx context.Context, name string, endPoint string, avoidVerification bool, ) (*pp.Peer, error) { if endPoint == "" { return nil, errors.New("no endpoint found for peer" + name) } peerID, err := oclib.GenerateNodeID() if err != nil { return nil, err } pub := d.Host.Peerstore().PubKey(d.Host.ID()) pubBytes, _ := pub.Raw() now := time.Now() expiry := now.Add(150 * time.Second) rec := DHTRecord{ Name: name, PeerID: peerID, PubKey: pubBytes, } payload, _ := json.Marshal(rec) sig, _ := daemons.Sign(d.Host.Peerstore().PrivKey(d.Host.ID()), payload) rec.Signature = sig rec.URL = endPoint rec.NATSUrl = oclib.GetConfig().NATSUrl rec.State = pp.ONLINE.EnumIndex() rec.ExpiryDate = expiry data, _ := json.Marshal(rec) key, err := d.generateKey() if err != nil { return nil, err } // retrieve your key name in standard if !avoidVerification { old, err := d.GetValue(ctx, key) if err == nil { if old.PeerID != peerID { // check if someone claims your name before return nil, errors.New("name already claimed by another peer") } if now.After(old.ExpiryDate) { payload, _ := json.Marshal(rec) d.PutValue(ctx, key, payload) } } } if err := d.PutValue(ctx, key, data); err != nil { return nil, err } pubStr := base64.StdEncoding.EncodeToString(pubBytes) d.Key = key if err != nil { return nil, err } p := &pp.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: name, }, State: pp.ONLINE, Relation: pp.SELF, PeerID: peerID, PublicKey: pubStr, Url: endPoint, NATSUrl: oclib.GetConfig().NATSUrl, WalletAddress: "my-wallet", } b, err := json.Marshal(p) if err != nil { return nil, err } tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.CREATE_PEER), Payload: b, }) return p, nil } // Discover a specific Peer func (d *DHTService) DiscoverPeers(ctx context.Context, name string) ([]*pp.Peer, error) { peers := []*pp.Peer{} key, err := d.generateKey() if err != nil { return nil, err } datas, _ := d.DHT.SearchValue(ctx, key) for data := range datas { var dht *DHTRecord if err := json.Unmarshal(data, dht); err != nil { return peers, err } if p, err := d.treatPeer(ctx, key, dht); err == nil { peers = append(peers, p) } } return peers, nil } func (d *DHTService) GetPeer(ctx context.Context, name string) (*pp.Peer, error) { key, err := d.generateKey() if err != nil { return nil, err } data, err := d.GetValue(ctx, key) if err != nil { return nil, errors.New("no DHT peer not found") } return d.treatPeer(ctx, key, data) } func (d *DHTService) treatPeer(ctx context.Context, key string, rec *DHTRecord) (*pp.Peer, error) { pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey) if err != nil { return nil, err } now := time.Now() dht := DHTRecord{ Name: rec.Name, PeerID: rec.PeerID, PubKey: rec.PubKey, } payload, _ := json.Marshal(dht) if ok, _ := daemons.Verify(pubKey, payload, rec.Signature); !ok { return nil, errors.New("invalid signature") } pubBytes, _ := pubKey.Raw() pubStr := base64.StdEncoding.EncodeToString(pubBytes) rel := pp.NONE if d.Key == key { rel = pp.SELF } p := &pp.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: rec.Name, }, State: pp.ONLINE, Relation: rel, PeerID: rec.PeerID, PublicKey: pubStr, Url: rec.URL, NATSUrl: rec.NATSUrl, WalletAddress: rec.Wallet, } if now.After(rec.ExpiryDate) { // is expired rec.State = pp.OFFLINE.EnumIndex() p.State = pp.OFFLINE payload, _ := json.Marshal(rec) d.PutValue(ctx, key, payload) b, err := json.Marshal(p) if err != nil { return nil, err } tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.CREATE_PEER), Payload: b, }) return nil, errors.New("peer " + key + " is expired") } if p.State == pp.OFFLINE { return nil, errors.New("peer " + key + " is offline") } return p, nil } // TODO : HEARTBEAT