decentralized -> peer discovery

This commit is contained in:
mr
2026-01-26 11:23:04 +01:00
parent 9e4d31e797
commit 07ec659bb4
6 changed files with 215 additions and 134 deletions

View File

@@ -22,6 +22,7 @@ func (o *DistributedPeerController) Search() {
//user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
// store and return Id or post with UUIDLibDataEnum
search := o.Ctx.Input.Param(":search")
service := infrastructure.GetDHTService()
code := 400
err := "no DHT Service available"

View File

@@ -73,7 +73,7 @@ func (o *PeerController) Link() {
o.Data["json"] = map[string]interface{}{
"data": nil,
"code": 400,
"error": "can't link relation to ourself",
"error": "can't link relation by ourself",
}
o.ServeJSON()
return
@@ -82,7 +82,7 @@ func (o *PeerController) Link() {
o.Data["json"] = map[string]interface{}{
"data": nil,
"code": 400,
"error": "can't link relation",
"error": "can't link relation to an other peer",
}
o.ServeJSON()
return
@@ -122,7 +122,7 @@ func (o *PeerController) Link() {
func (o *PeerController) Unknown() {
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
id := o.Ctx.Input.Param(":id")
req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
data := req.LoadOne(id)
o.changeRelation(data.ToPeer(), peer.NONE, req)
}
@@ -135,7 +135,7 @@ func (o *PeerController) Unknown() {
func (o *PeerController) Partner() {
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
id := o.Ctx.Input.Param(":id")
req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
data := req.LoadOne(id)
o.changeRelation(data.ToPeer(), peer.PARTNER, req)
}
@@ -150,6 +150,7 @@ func (o *PeerController) Blacklist() {
id := o.Ctx.Input.Param(":id")
o.Data["json"] = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil).UpdateOne(map[string]interface{}{
"relation": peer.BLACKLIST,
"state": peer.OFFLINE,
}, id)
}
@@ -218,3 +219,20 @@ func (o *PeerController) DeleteState() {
}, id)
o.ServeJSON()
}
/*
Un pair change le statut d'un autre pair, alors ce dernier est joins automatiquement, on ne peut pas s'auto lié seul un externe peut faire ce processus de demande.
On change le pair pour pouvoir le mettre à jour, alors, le lien se met à jour automatiquement. p1 -> update status -> link (p2) -> p2 response -> update status -> link (p1)
Que définit le partnership de type : Partner, Blacklist et None.
BlackList : implique qu'on ne peut pas "voir" un pair et ses ressources, d'un coté comme de l'autre. Il suffit qu'un pair définisse la Blacklist, le pair opposant ne peut plus le voir.
(DHT allons plus loin)
Partner : Donne accès à des ressources non publique duquel un partnership à été modélisé, de type * ou dédié.
None : Est visible, et ne donne accès qu'au ressources publique et aux profiles all. Les profiles dédiés ne s'appliquent que si on est dans une situation de partenariat.
Note: L'état PENDING revient à dire qu'une relation est à l'état None,... si il y a une vérification on considère tout status à None. C'est une vérification importante.
*/

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.24.6
toolchain go1.24.11
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260123065115-f3d7c65b18d1
cloud.o-forge.io/core/oc-lib v0.0.0-20260126093615-bc94f2b188e6
github.com/beego/beego/v2 v2.3.8
github.com/smartystreets/goconvey v1.7.2
)

2
go.sum
View File

@@ -16,6 +16,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260123064804-d06c9e933783 h1:ztJplXt5FIrST
cloud.o-forge.io/core/oc-lib v0.0.0-20260123064804-d06c9e933783/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260123065115-f3d7c65b18d1 h1:K7ind/dAshdoFb0om35YY6phWJcYhHj1YMlTrrwKH4s=
cloud.o-forge.io/core/oc-lib v0.0.0-20260123065115-f3d7c65b18d1/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260126093615-bc94f2b188e6 h1:Sxjq1lQwSl+gkUYag4wAb6j74uU/JZviw1hkFavt58o=
cloud.o-forge.io/core/oc-lib v0.0.0-20260126093615-bc94f2b188e6/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc=
github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg=

View File

@@ -16,7 +16,7 @@ func verify(pub crypto.PubKey, data, sig []byte) (bool, error) {
return pub.Verify(data, sig)
}
func loadKeyFromFile(isPublic bool) (crypto.PrivKey, error) {
func LoadKeyFromFile(isPublic bool) (crypto.PrivKey, error) {
path := conf.GetConfig().PrivateKeyPath
if isPublic {
path = conf.GetConfig().PublicKeyPath
@@ -35,12 +35,12 @@ func loadKeyFromFile(isPublic bool) (crypto.PrivKey, error) {
}
func VerifyPubWithPriv() bool {
priv, err := loadKeyFromFile(false)
priv, err := LoadKeyFromFile(false)
if err != nil {
fmt.Println(err)
return false
}
pub, err := loadKeyFromFile(true)
pub, err := LoadKeyFromFile(true)
if err != nil {
fmt.Println(err)
return false

View File

@@ -7,10 +7,11 @@ import (
"errors"
"fmt"
"oc-peer/conf"
"slices"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/peer"
pp "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
@@ -21,20 +22,23 @@ import (
)
type DHTRecord struct {
Name string
State int
PeerID string
PubKey []byte
URL string
NATSUrl string
Signature []byte
ExpiryDate time.Time
Name string `json:"name"`
State int `json:"state"`
DID string `json:"did"`
PeerID string `json:"peer_id"`
PubKey []byte `json:"pub_key"`
URL string `json:"url"`
NATSUrl string `json:"nats_url"`
Wallet string `json:"wallet"`
Signature []byte `json:"signature"`
ExpiryDate time.Time `json:"expiry_date"`
}
type DHTService struct {
Key string
Host host.Host
DHT *dht.IpfsDHT
Cache []string
}
var singletonService *DHTService
@@ -45,7 +49,7 @@ func GetDHTService() *DHTService {
func Init(ctx context.Context) (*DHTService, error) {
service := &DHTService{}
priv, err := loadKeyFromFile(false)
priv, err := LoadKeyFromFile(false)
if err != nil {
return nil, err
}
@@ -63,21 +67,115 @@ func Init(ctx context.Context) (*DHTService, error) {
if err != nil {
return nil, err
}
err = service.DHT.Bootstrap(ctx)
if err != nil {
return nil, err
}
singletonService = service
if VerifyPubWithPriv() {
if _, err := singletonService.ClaimName(context.Background(),
conf.GetConfig().Name,
conf.GetConfig().Hostname); err == nil {
go func() {
for {
singletonService.RefreshName(context.Background())
time.Sleep(59 * time.Minute)
}
}()
go service.RefreshKeys(ctx, 30*time.Minute)
}
}
return service, service.DHT.Bootstrap(ctx)
return service, nil
}
func (d *DHTService) RefreshKeys(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s := []string{}
s = append(s, d.Cache...)
for _, key := range s {
_, _ = d.GetValue(ctx, key)
}
}
}
}()
}
func (d *DHTService) PutValue(
ctx context.Context,
key string,
value []byte,
) error {
err := d.DHT.PutValue(ctx, key, value)
if err != nil {
return err
}
if !slices.Contains(d.Cache, key) {
d.Cache = append(d.Cache, key)
}
return nil
}
func (d *DHTService) GetValue(
ctx context.Context,
key string,
) (*DHTRecord, error) {
dht, err := d.DHT.GetValue(ctx, key)
if err != nil {
cache := []string{}
for _, c := range d.Cache {
if c != key {
cache = append(cache, c)
}
}
d.Cache = cache
return nil, err
}
if !slices.Contains(d.Cache, key) {
d.Cache = append(d.Cache, key)
}
var data *DHTRecord
json.Unmarshal(dht, data)
if data == nil {
return nil, errors.New("no record found")
}
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
p := &pp.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: data.Name,
},
State: pp.ONLINE,
Relation: pp.SELF,
PeerID: d.Host.ID().String(),
PublicKey: string(data.PubKey),
Url: data.URL,
NATSUrl: oclib.GetConfig().NATSUrl,
WalletAddress: data.Wallet,
}
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil && f.(*pp.Peer).Relation != pp.BLACKLIST {
f.(*pp.Peer).State = pp.ONLINE
f.(*pp.Peer).NATSUrl = p.NATSUrl
f.(*pp.Peer).Url = p.Url
f.(*pp.Peer).PeerID = p.PeerID
f.(*pp.Peer).Relation = p.Relation
f.(*pp.Peer).WalletAddress = p.WalletAddress
access.UpdateOne(f, f.GetID())
}
return data, err
}
func (d *DHTService) generateKey(
sub string,
name string,
) string {
return "/opencloud/" + sub + "/" + name
}
// Create your peer.
@@ -85,7 +183,7 @@ func (d *DHTService) ClaimName(
ctx context.Context,
name string,
endPoint string,
) (*peer.Peer, error) {
) (*pp.Peer, error) {
if endPoint == "" {
return nil, errors.New("no endpoint found for peer" + name)
}
@@ -107,67 +205,62 @@ func (d *DHTService) ClaimName(
rec.URL = endPoint
rec.NATSUrl = oclib.GetConfig().NATSUrl
rec.State = peer.ONLINE.EnumIndex()
rec.State = pp.ONLINE.EnumIndex()
rec.ExpiryDate = expiry
data, _ := json.Marshal(rec)
key := name
key := d.generateKey("peer", rec.Name)
// retrieve your key name in standard
existing, err := d.DHT.GetValue(ctx, key)
if err == nil && existing != nil {
var old DHTRecord
_ = json.Unmarshal(existing, &old)
old, err := d.GetValue(ctx, key)
if err == nil {
if old.PeerID != d.Host.ID().String() { // check if someone claims your name before
return nil, errors.New("name already claimed by another peer")
// TODO : can be fragile if 2 peers connect at the same time
}
if now.After(old.ExpiryDate) {
payload, _ := json.Marshal(rec)
d.DHT.PutValue(ctx, key, payload)
d.PutValue(ctx, key, payload)
}
}
if err := d.DHT.PutValue(ctx, key, data); err != nil {
if err := d.PutValue(ctx, key, data); err != nil {
return nil, err
}
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
d.Key = key
access := peer.NewAccessor(&tools.APIRequest{Admin: true})
p := &peer.Peer{
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
p := &pp.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: name,
},
State: peer.ONLINE,
Relation: peer.SELF,
State: pp.ONLINE,
Relation: pp.SELF,
PeerID: d.Host.ID().String(),
PublicKey: pubStr,
Url: endPoint,
NATSUrl: oclib.GetConfig().NATSUrl,
WalletAddress: "my-wallet",
}
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", peer.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
f.(*peer.Peer).Name = name
f.(*peer.Peer).PeerID = d.Host.ID().String()
f.(*peer.Peer).State = peer.ONLINE
f.(*peer.Peer).Url = endPoint
f.(*pp.Peer).Name = name
f.(*pp.Peer).PeerID = d.Host.ID().String()
f.(*pp.Peer).State = pp.ONLINE
f.(*pp.Peer).Url = endPoint
f.(*pp.Peer).NATSUrl = oclib.GetConfig().NATSUrl
f.(*pp.Peer).Relation = pp.SELF
access.UpdateOne(f, f.GetID())
}
return p, nil
}
func (d *DHTService) treatPeer(ctx context.Context, key string, data []byte) (*peer.Peer, error) {
var rec DHTRecord
if err := json.Unmarshal(data, &rec); err != nil {
return nil, err
}
func (d *DHTService) treatPeer(ctx context.Context, key string, rec *DHTRecord) (*pp.Peer, error) {
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
if err != nil {
return nil, err
@@ -187,107 +280,105 @@ func (d *DHTService) treatPeer(ctx context.Context, key string, data []byte) (*p
pubBytes, _ := pubKey.Raw()
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
rel := peer.NONE
rel := pp.NONE
if d.Key == key {
rel = peer.SELF
rel = pp.SELF
}
p := &peer.Peer{
p := &pp.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: rec.Name,
},
State: peer.ONLINE,
State: pp.ONLINE,
Relation: rel,
PeerID: rec.PeerID,
PublicKey: pubStr,
Url: rec.URL,
NATSUrl: rec.NATSUrl,
WalletAddress: rec.Wallet,
}
access := peer.NewAccessor(&tools.APIRequest{Admin: true})
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
if now.After(rec.ExpiryDate) {
rec.State = peer.OFFLINE.EnumIndex()
p.State = peer.OFFLINE
rec.State = pp.OFFLINE.EnumIndex()
p.State = pp.OFFLINE
payload, _ := json.Marshal(rec)
d.DHT.PutValue(ctx, key, payload)
d.PutValue(ctx, key, payload)
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
f.(*peer.Peer).State = peer.OFFLINE
f.(*pp.Peer).State = pp.OFFLINE
access.UpdateOne(f, f.GetID())
}
return nil, errors.New("peer " + key + " is expired")
}
if rec.State == peer.OFFLINE.EnumIndex() {
if rec.State == pp.OFFLINE.EnumIndex() {
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
pp, _, err := access.StoreOne(p)
ppp, _, err := access.StoreOne(p)
if err == nil {
return pp.(*peer.Peer), nil
return ppp.(*pp.Peer), nil
}
} else {
f, ok := peer.CheckPeerStatus(founded[0].GetID(), "")
f, ok := pp.CheckPeerStatus(founded[0].GetID(), "")
if ok {
f.State = peer.ONLINE
f.State = pp.ONLINE
} else {
f.State = peer.OFFLINE
f.State = pp.OFFLINE
}
pp, _, err := access.UpdateOne(f, f.GetID())
ppp, _, err := access.UpdateOne(f, f.GetID())
if err == nil && ok {
return pp.(*peer.Peer), nil
return ppp.(*pp.Peer), nil
}
}
}
return nil, errors.New("peer " + key + " is offline")
}
func (d *DHTService) GetPeer(
ctx context.Context,
name string,
) (*peer.Peer, error) {
key := name
data, err := d.DHT.GetValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
return d.treatPeer(ctx, key, data)
}
// Discover a specific Peer
func (d *DHTService) DiscoverPeers(
ctx context.Context,
name string,
) ([]*peer.Peer, error) {
peers := []*peer.Peer{}
key := name
) ([]*pp.Peer, error) {
peers := []*pp.Peer{}
key := d.generateKey("peer", name)
datas, err := d.DHT.SearchValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
for data := range datas {
if p, err := d.treatPeer(ctx, key, data); err == nil {
var dht *DHTRecord
json.Unmarshal(data, dht)
if p, err := d.treatPeer(ctx, key, dht); err == nil {
peers = append(peers, p)
}
}
return peers, nil
}
func (d *DHTService) GetPeer(
ctx context.Context,
name string,
) (*pp.Peer, error) {
key := d.generateKey("peer", name)
data, err := d.GetValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
return d.treatPeer(ctx, key, data)
}
func (d *DHTService) existsDHT(ctx context.Context) (*DHTRecord, error) {
if d.Key == "" {
return nil, errors.New("no self peer found")
}
data, err := d.DHT.GetValue(ctx, d.Key)
rec, err := d.GetValue(ctx, d.Key)
if err != nil {
return nil, errors.New("no DHT peer found")
}
var rec DHTRecord
if err := json.Unmarshal(data, &rec); err != nil {
return &rec, err
}
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
if err != nil {
return &rec, err
return rec, err
}
dht := DHTRecord{
@@ -298,42 +389,11 @@ func (d *DHTService) existsDHT(ctx context.Context) (*DHTRecord, error) {
payload, _ := json.Marshal(dht)
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
return &rec, err
return rec, err
}
return &rec, nil
return rec, nil
}
func (d *DHTService) RefreshName( // peer should regulary refresh your host to not be settings up offline.
ctx context.Context,
) error {
rec, err := d.existsDHT(ctx)
if err != nil {
return err
}
now := time.Now()
expiry := now.Add(1 * time.Hour)
rec.State = peer.ONLINE.EnumIndex()
rec.ExpiryDate = expiry
payload, _ := json.Marshal(rec)
d.DHT.PutValue(ctx, d.Key, payload)
access := peer.NewAccessor(&tools.APIRequest{Admin: true})
p := &peer.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: rec.Name,
},
State: peer.ONLINE, // online
Relation: peer.SELF, // self
PeerID: rec.PeerID,
PublicKey: string(rec.PubKey),
Url: rec.URL,
NATSUrl: rec.NATSUrl,
}
if founded, _, err := access.Search(nil, rec.Name, false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
f.(*peer.Peer).State = peer.ONLINE
access.UpdateOne(f, f.GetID())
}
return nil
}
/*
Apply Name interlude...
*/