Files
oc-peer/infrastructure/dht.go
2026-01-15 13:35:11 +01:00

337 lines
8.0 KiB
Go

package infrastructure
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"oc-peer/conf"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
)
type DHTRecord struct {
Name string
State int
PeerID string
PubKey []byte
URL string
Signature []byte
ExpiryDate time.Time
}
type DHTService struct {
Key string
Host host.Host
DHT *dht.IpfsDHT
}
var singletonService *DHTService
func GetDHTService() *DHTService {
return singletonService
}
func Init(ctx context.Context) (*DHTService, error) {
service := &DHTService{}
priv, err := loadKeyFromFile(false)
if err != nil {
return nil, err
}
h, err := libp2p.New(
libp2p.Identity(priv),
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().DHTEndpointPort),
),
)
if err != nil {
return nil, err
}
service.Host = h
service.DHT, err = dht.New(ctx, h, dht.MaxRecordAge(24*time.Hour)) // every day DHT will purge expired data... if not used.
if err != nil {
return nil, err
}
singletonService = service
for {
if VerifyPubWithPriv() {
o := oclib.GetConfLoader()
if _, err := singletonService.ClaimName(context.Background(), o.GetStringDefault("NAME", "local"), o.GetStringDefault("HOSTNAME", "http://localhost")); err == nil {
go func() {
for {
singletonService.RefreshName(context.Background())
time.Sleep(59 * time.Minute)
}
}()
}
}
break
}
return service, service.DHT.Bootstrap(ctx)
}
// Create your peer.
func (d *DHTService) ClaimName(
ctx context.Context,
name string,
endPoint string,
) (*peer.Peer, error) {
if endPoint == "" {
return nil, errors.New("no endpoint found for peer" + name)
}
pub := d.Host.Peerstore().PubKey(d.Host.ID())
pubBytes, _ := pub.Raw()
now := time.Now()
expiry := now.Add(1 * time.Hour)
rec := DHTRecord{
Name: name,
PeerID: d.Host.ID().String(),
PubKey: pubBytes,
}
payload, _ := json.Marshal(rec)
sig, _ := sign(d.Host.Peerstore().PrivKey(d.Host.ID()), payload)
rec.Signature = sig
rec.URL = endPoint
rec.State = peer.ONLINE.EnumIndex()
rec.ExpiryDate = expiry
data, _ := json.Marshal(rec)
key := name
// retrieve your key name in standard
existing, err := d.DHT.GetValue(ctx, key)
if err == nil && existing != nil {
var old DHTRecord
_ = json.Unmarshal(existing, &old)
if old.PeerID != d.Host.ID().String() { // check if someone claims your name before
return nil, errors.New("name already claimed by another peer")
}
if now.After(old.ExpiryDate) {
payload, _ := json.Marshal(rec)
d.DHT.PutValue(ctx, key, payload)
}
}
if err := d.DHT.PutValue(ctx, key, data); err != nil {
return nil, err
}
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
d.Key = key
access := peer.NewAccessor(&tools.APIRequest{Admin: true})
p := &peer.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: name,
},
State: peer.ONLINE,
Relation: peer.SELF,
PeerID: d.Host.ID().String(),
PublicKey: pubStr,
Url: endPoint,
WalletAddress: "my-wallet",
}
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", peer.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
f.(*peer.Peer).Name = name
f.(*peer.Peer).PeerID = d.Host.ID().String()
f.(*peer.Peer).State = peer.ONLINE
f.(*peer.Peer).Url = endPoint
access.UpdateOne(f, f.GetID())
}
return p, nil
}
func (d *DHTService) treatPeer(ctx context.Context, key string, data []byte) (*peer.Peer, error) {
var rec DHTRecord
if err := json.Unmarshal(data, &rec); err != nil {
return nil, err
}
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
if err != nil {
return nil, err
}
now := time.Now()
dht := DHTRecord{
Name: rec.Name,
PeerID: rec.PeerID,
PubKey: rec.PubKey,
}
payload, _ := json.Marshal(dht)
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
return nil, errors.New("invalid signature")
}
pubBytes, _ := pubKey.Raw()
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
rel := peer.NONE
if d.Key == key {
rel = peer.SELF
}
p := &peer.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: rec.Name,
},
State: peer.ONLINE,
Relation: rel,
PeerID: rec.PeerID,
PublicKey: pubStr,
Url: rec.URL,
}
access := peer.NewAccessor(&tools.APIRequest{Admin: true})
if now.After(rec.ExpiryDate) {
rec.State = peer.OFFLINE.EnumIndex()
p.State = peer.OFFLINE
payload, _ := json.Marshal(rec)
d.DHT.PutValue(ctx, key, payload)
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
f.(*peer.Peer).State = peer.OFFLINE
access.UpdateOne(f, f.GetID())
}
return nil, errors.New("peer " + key + " is expired")
}
if rec.State == peer.OFFLINE.EnumIndex() {
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
pp, _, err := access.StoreOne(p)
if err == nil {
return pp.(*peer.Peer), nil
}
} else {
f, ok := peer.CheckPeerStatus(founded[0].GetID(), "")
if ok {
f.State = peer.ONLINE
} else {
f.State = peer.OFFLINE
}
pp, _, err := access.UpdateOne(f, f.GetID())
if err == nil && ok {
return pp.(*peer.Peer), nil
}
}
}
return nil, errors.New("peer " + key + " is offline")
}
func (d *DHTService) GetPeer(
ctx context.Context,
name string,
) (*peer.Peer, error) {
key := name
data, err := d.DHT.GetValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
return d.treatPeer(ctx, key, data)
}
// Discover a specific Peer
func (d *DHTService) DiscoverPeers(
ctx context.Context,
name string,
) ([]*peer.Peer, error) {
peers := []*peer.Peer{}
key := name
datas, err := d.DHT.SearchValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
for data := range datas {
if p, err := d.treatPeer(ctx, key, data); err == nil {
peers = append(peers, p)
}
}
return peers, nil
}
func (d *DHTService) existsDHT(ctx context.Context) (*DHTRecord, error) {
if d.Key == "" {
return nil, errors.New("no self peer found")
}
data, err := d.DHT.GetValue(ctx, d.Key)
if err != nil {
return nil, errors.New("no DHT peer found")
}
var rec DHTRecord
if err := json.Unmarshal(data, &rec); err != nil {
return &rec, err
}
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
if err != nil {
return &rec, err
}
dht := DHTRecord{
Name: rec.Name,
PeerID: rec.PeerID,
PubKey: rec.PubKey,
URL: rec.URL,
}
payload, _ := json.Marshal(dht)
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
return &rec, err
}
return &rec, nil
}
func (d *DHTService) RefreshName( // peer should regulary refresh your host to not be settings up offline.
ctx context.Context,
) error {
rec, err := d.existsDHT(ctx)
if err != nil {
return err
}
now := time.Now()
expiry := now.Add(1 * time.Hour)
rec.State = peer.ONLINE.EnumIndex()
rec.ExpiryDate = expiry
payload, _ := json.Marshal(rec)
d.DHT.PutValue(ctx, d.Key, payload)
access := peer.NewAccessor(&tools.APIRequest{Admin: true})
p := &peer.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: rec.Name,
},
State: peer.ONLINE, // online
Relation: peer.SELF, // self
PeerID: rec.PeerID,
PublicKey: string(rec.PubKey),
Url: rec.URL,
}
if founded, _, err := access.Search(nil, rec.Name, false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
f.(*peer.Peer).State = peer.ONLINE
access.UpdateOne(f, f.GetID())
}
return nil
}