diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md deleted file mode 100644 index 5bd67ad..0000000 --- a/ARCHITECTURE.md +++ /dev/null @@ -1,495 +0,0 @@ -# oc-discovery — Architecture et analyse technique - -> **Convention de lecture** -> Les points marqués ✅ ont été corrigés dans le code. Les points marqués ⚠️ restent ouverts. - -## Table des matières - -1. [Vue d'ensemble](#1-vue-densemble) -2. [Hiérarchie des rôles](#2-hiérarchie-des-rôles) -3. [Mécanismes principaux](#3-mécanismes-principaux) - - 3.1 Heartbeat long-lived (node → indexer) - - 3.2 Scoring de confiance - - 3.3 Enregistrement auprès des natifs (indexer → native) - - 3.4 Pool d'indexeurs : fetch + consensus - - 3.5 Self-delegation et offload loop - - 3.6 Résilience du mesh natif - - 3.7 DHT partagée - - 3.8 PubSub gossip (indexer registry) - - 3.9 Streams applicatifs (node ↔ node) -4. [Tableau récapitulatif](#4-tableau-récapitulatif) -5. [Risques et limites globaux](#5-risques-et-limites-globaux) -6. [Pistes d'amélioration](#6-pistes-damélioration) - ---- - -## 1. Vue d'ensemble - -`oc-discovery` est un service de découverte P2P pour le réseau OpenCloud. Il repose sur -**libp2p** (transport TCP + PSK réseau privé) et une **DHT Kademlia** (préfixe `oc`) -pour indexer les pairs. L'architecture est intentionnellement hiérarchique : des _natifs_ -stables servent de hubs autoritaires auxquels des _indexeurs_ s'enregistrent, et des _nœuds_ -ordinaires découvrent des indexeurs via ces natifs. - -``` - ┌──────────────┐ heartbeat ┌──────────────────┐ - │ Node │ ───────────────────► │ Indexer │ - │ (libp2p) │ ◄─────────────────── │ (DHT server) │ - └──────────────┘ stream applicatif └────────┬─────────┘ - │ subscribe / heartbeat - ▼ - ┌──────────────────┐ - │ Native Indexer │◄──► autres natifs - │ (hub autoritaire│ (mesh) - └──────────────────┘ -``` - -Tous les participants partagent une **clé pré-partagée (PSK)** qui isole le réseau -des connexions libp2p externes non autorisées. - ---- - -## 2. Hiérarchie des rôles - -| Rôle | Binaire | Responsabilité | -|---|---|---| -| **Node** | `node_mode=node` | Se fait indexer, publie/consulte des records DHT | -| **Indexer** | `node_mode=indexer` | Reçoit les heartbeats, écrit en DHT, s'enregistre auprès des natifs | -| **Native Indexer** | `node_mode=native` | Hub : tient le registre des indexeurs vivants, évalue le consensus, sert de fallback | - -Un même processus peut cumuler les rôles node+indexer ou indexer+native. - ---- - -## 3. Mécanismes principaux - -### 3.1 Heartbeat long-lived (node → indexer) - -**Fonctionnement** - -Un stream libp2p **persistant** (`/opencloud/heartbeat/1.0`) est ouvert depuis le nœud -vers chaque indexeur de son pool (`StaticIndexers`). Toutes les 20 secondes, le nœud -envoie un `Heartbeat` JSON sur ce stream. L'indexeur répond en enregistrant le peer dans -`StreamRecords[ProtocolHeartbeat]` avec une expiry de 2 min. - -Si `sendHeartbeat` échoue (stream reset, EOF, timeout), le peer est retiré de -`StaticIndexers` et `replenishIndexersFromNative` est déclenché. - -**Avantages** -- Détection rapide de déconnexion (erreur sur le prochain encode). -- Un seul stream par pair réduit la pression sur les connexions TCP. -- Le channel de nudge (`indexerHeartbeatNudge`) permet un reconnect immédiat sans - attendre le ticker de 20 s. - -**Limites / risques** -- ⚠️ Un seul stream persistant : si la couche TCP reste ouverte mais "gelée" (middlebox, - NAT silencieux), l'erreur peut ne pas remonter avant plusieurs minutes. -- ⚠️ `StaticIndexers` est une map partagée globale : si deux goroutines appellent - `replenishIndexersFromNative` simultanément (cas de perte multiple), on peut avoir - des écritures concurrentes non protégées hors des sections critiques. - ---- - -### 3.2 Scoring de confiance - -**Fonctionnement** - -Avant d'enregistrer un heartbeat dans `StreamRecords`, l'indexeur vérifie un **score -minimum** calculé par `CheckHeartbeat` : - -``` -Score = (0.4 × uptime_ratio + 0.4 × bpms + 0.2 × diversity) × 100 -``` - -- `uptime_ratio` : durée de présence du peer / durée depuis le démarrage de l'indexeur. -- `bpms` : débit mesuré via un stream dédié (`/opencloud/probe/1.0`) normalisé par 50 Mbps. -- `diversity` : ratio d'IP /24 distincts parmi les indexeurs que le peer déclare. - -Deux seuils sont appliqués selon l'état du peer : -- **Premier heartbeat** (peer absent de `StreamRecords`, uptime = 0) : seuil à **40**. -- **Heartbeats suivants** (uptime accumulé) : seuil à **75**. - -**Avantages** -- Décourage les peers éphémères ou lents d'encombrer le registre. -- La diversité réseau réduit le risque de concentration sur un seul sous-réseau. -- Le stream de probe dédié évite de polluer le stream JSON heartbeat avec des données binaires. -- Le double seuil permet aux nouveaux peers d'être admis dès leur première connexion. - -**Limites / risques** -- ✅ **Deadlock logique de démarrage corrigé** : avec uptime = 0 le score maximal était 60, - en-dessous du seuil de 75. Les nouveaux peers étaient silencieusement rejetés à jamais. - → Seuil abaissé à **40** pour le premier heartbeat (`isFirstHeartbeat`), 75 ensuite. -- ⚠️ Les seuils (40 / 75) restent câblés en dur, sans possibilité de configuration. -- ⚠️ La mesure de bande passante envoie entre 512 et 2048 octets par heartbeat : à 20 s - d'intervalle et 500 nœuds max, cela représente ~50 KB/s de trafic probe en continu. -- ⚠️ `diversity` est calculé sur les adresses que le nœud *déclare* avoir — ce champ est - auto-rapporté et non vérifié, facilement falsifiable. - ---- - -### 3.3 Enregistrement auprès des natifs (indexer → native) - -**Fonctionnement** - -Chaque indexeur (non-natif) envoie périodiquement (toutes les 60 s) une -`IndexerRegistration` JSON sur un stream one-shot (`/opencloud/native/subscribe/1.0`) -vers chaque natif configuré. Le natif : - -1. Stocke l'entrée en cache local avec un TTL de **90 s** (`IndexerTTL`). -2. Gossipe le `PeerID` sur le topic PubSub `oc-indexer-registry` aux autres natifs. -3. Persiste l'entrée en DHT de manière asynchrone (retry jusqu'à succès). - -**Avantages** -- Stream jetable : pas de ressource longue durée côté natif pour les enregistrements. -- Le cache local est immédiatement disponible pour `handleNativeGetIndexers` sans - attendre la DHT. -- La dissémination PubSub permet à d'autres natifs de connaître l'indexeur sans - qu'il ait besoin de s'y enregistrer directement. - -**Limites / risques** -- ✅ **TTL trop serré corrigé** : le TTL de 66 s n'était que 10 % au-dessus de l'intervalle - de 60 s — un léger retard réseau pouvait expirer un indexeur sain entre deux renewals. - → `IndexerTTL` porté à **90 s** (+50 %). -- ⚠️ Si le `PutValue` DHT échoue définitivement (réseau partitionné), le natif possède - l'entrée mais les autres natifs qui n'ont pas reçu le message PubSub ne la connaissent - jamais — incohérence silencieuse. -- ⚠️ `RegisterWithNative` ignore les adresses en `127.0.0.1`, mais ne gère pas - les adresses privées (RFC1918) qui seraient non routables depuis d'autres hôtes. - ---- - -### 3.4 Pool d'indexeurs : fetch + consensus - -**Fonctionnement** - -Lors de `ConnectToNatives` (démarrage ou replenish), le nœud/indexeur : - -1. **Fetch** : envoie `GetIndexersRequest` au premier natif répondant - (`/opencloud/native/indexers/1.0`), reçoit une liste de candidats. -2. **Consensus (round 1)** : interroge **tous** les natifs configurés en parallèle - (`/opencloud/native/consensus/1.0`, timeout 3 s, collecte sur 4 s). - Un indexeur est confirmé si **strictement plus de 50 %** des natifs répondants - le considèrent vivant. -3. **Consensus (round 2)** : si le pool est insuffisant, les suggestions des natifs - (indexeurs qu'ils connaissent mais qui n'étaient pas dans les candidats initiaux) - sont soumises à un second round. - -**Avantages** -- La règle de majorité absolue empêche un natif compromis ou désynchronisé d'injecter - des indexeurs fantômes. -- Le double round permet de compléter le pool avec des alternatives connues des natifs - sans sacrifier la vérification. -- Si le fetch retourne un **fallback** (natif comme indexeur), le consensus est skippé — - cohérent car il n'y a qu'une seule source. - -**Limites / risques** -- ⚠️ Avec **un seul natif** configuré (très courant en dev/test), le consensus est trivial - (100 % d'un seul vote) — la règle de majorité ne protège rien dans ce cas. -- ⚠️ `fetchIndexersFromNative` s'arrête au **premier natif répondant** (séquentiellement) : - si ce natif a un cache périmé ou partiel, le nœud obtient un pool sous-optimal sans - consulter les autres. -- ⚠️ Le timeout de collecte global (4 s) est fixe : sur un réseau lent ou géographiquement - distribué, des natifs valides peuvent être éliminés faute de réponse à temps. -- ⚠️ `replaceStaticIndexers` **ajoute** sans jamais retirer d'anciens indexeurs expirés : - le pool peut accumuler des entrées mortes que seul le heartbeat purge ensuite. - ---- - -### 3.5 Self-delegation et offload loop - -**Fonctionnement** - -Si un natif ne dispose d'aucun indexeur vivant lors d'un `handleNativeGetIndexers`, -il se désigne lui-même comme indexeur temporaire (`selfDelegate`) : il retourne sa propre -adresse multiaddr et ajoute le demandeur dans `responsiblePeers`, dans la limite de -`maxFallbackPeers` (50). Au-delà, la délégation est refusée et une réponse vide est -retournée pour que le nœud tente un autre natif. - -Toutes les 30 s, `runOffloadLoop` vérifie si des indexeurs réels sont de nouveau -disponibles. Si oui, pour chaque peer responsable : -- **Stream présent** : `Reset()` du stream heartbeat — le peer reçoit une erreur, - déclenche `replenishIndexersFromNative` et migre vers de vrais indexeurs. -- **Stream absent** (peer jamais admis par le scoring) : `ClosePeer()` sur la connexion - réseau — le peer reconnecte et re-demande ses indexeurs au natif. - -**Avantages** -- Continuité de service : un nœud n'est jamais bloqué en l'absence temporaire d'indexeurs. -- La migration est automatique et transparente pour le nœud. -- `Reset()` (vs `Close()`) interrompt les deux sens du stream, garantissant que le peer - reçoit bien une erreur. -- La limite de 50 empêche le natif de se retrouver surchargé lors de pénuries prolongées. - -**Limites / risques** -- ✅ **Offload sans stream corrigé** : si le heartbeat n'avait jamais été enregistré dans - `StreamRecords` (score < seuil — cas amplifié par le bug de scoring), l'offload - échouait silencieusement et le peer restait dans `responsiblePeers` indéfiniment. - → Branche `else` : `ClosePeer()` + suppression de `responsiblePeers`. -- ✅ **`responsiblePeers` illimité corrigé** : le natif acceptait un nombre arbitraire - de peers en self-delegation, devenant lui-même un indexeur surchargé. - → `selfDelegate` vérifie `len(responsiblePeers) >= maxFallbackPeers` et retourne - `false` si saturé. -- ⚠️ La délégation reste non coordonnée entre natifs : un natif surchargé refuse (retourne - vide) mais ne redirige pas explicitement vers un natif voisin qui aurait de la capacité. - ---- - -### 3.6 Résilience du mesh natif - -**Fonctionnement** - -Quand le heartbeat vers un natif échoue, `replenishNativesFromPeers` tente de trouver -un remplaçant dans cet ordre : - -1. `fetchNativeFromNatives` : demande à chaque natif vivant (`/opencloud/native/peers/1.0`) - une adresse de natif inconnue. -2. `fetchNativeFromIndexers` : demande à chaque indexeur connu - (`/opencloud/indexer/natives/1.0`) ses natifs configurés. -3. Si aucun remplaçant et `remaining ≤ 1` : `retryLostNative` relance un ticker de 30 s - qui retente la connexion directe au natif perdu. - -`EnsureNativePeers` maintient des heartbeats de natif à natif via `ProtocolHeartbeat`, -avec une **unique goroutine** couvrant toute la map `StaticNatives`. - -**Avantages** -- Le gossip multi-hop via indexeurs permet de retrouver un natif même si aucun pair - direct ne le connaît. -- `retryLostNative` gère le cas d'un seul natif (déploiement minimal). -- La reconnexion automatique (`retryLostNative`) déclenche `replenishIndexersIfNeeded` - pour restaurer aussi le pool d'indexeurs. - -**Limites / risques** -- ✅ **Goroutines heartbeat multiples corrigé** : `EnsureNativePeers` démarrait une - goroutine `SendHeartbeat` par adresse native (N natifs → N goroutines → N² heartbeats - par tick). → Utilisation de `nativeMeshHeartbeatOnce` : une seule goroutine itère sur - `StaticNatives`. -- ⚠️ `retryLostNative` tourne indéfiniment sans condition d'arrêt liée à la vie du processus - (pas de `context.Context`). Si le binaire est gracefully shutdown, cette goroutine - peut bloquer. -- ⚠️ La découverte transitoire (natif → indexeur → natif) est à sens unique : un indexeur - ne connaît que les natifs de sa propre config, pas les nouveaux natifs qui auraient - rejoint après son démarrage. - ---- - -### 3.7 DHT partagée - -**Fonctionnement** - -Tous les indexeurs et natifs participent à une DHT Kademlia (préfixe `oc`, mode -`ModeServer`). Deux namespaces sont utilisés : - -- `/node/` → `PeerRecord` JSON signé (publié par les indexeurs sur heartbeat de nœud). -- `/indexer/` → `liveIndexerEntry` JSON avec TTL (publié par les natifs). - -Chaque natif lance `refreshIndexersFromDHT` (toutes les 30 s) qui ré-hydrate son cache -local depuis la DHT pour les PeerIDs connus (`knownPeerIDs`) dont l'entrée locale a expiré. - -**Avantages** -- Persistance décentralisée : un record survit à la perte d'un seul natif ou indexeur. -- Validation des entrées : `PeerRecordValidator` et `IndexerRecordValidator` rejettent - les records malformés ou expirés au moment du `PutValue`. -- L'index secondaire `/name/` permet la résolution par nom humain. - -**Limites / risques** -- ⚠️ La DHT Kademlia en réseau privé (PSK) est fonctionnelle mais les nœuds bootstrap - ne sont pas configurés explicitement : la découverte dépend de connexions déjà établies, - ce qui peut ralentir la convergence au démarrage. -- ⚠️ `PutValue` est réessayé en boucle infinie si `"failed to find any peer in table"` — - une panne de réseau prolongée génère des goroutines bloquées. -- ⚠️ Si la PSK est compromise, un attaquant peut écrire dans la DHT ; les `liveIndexerEntry` - d'indexeurs ne sont pas signées, contrairement aux `PeerRecord`. -- ⚠️ `refreshIndexersFromDHT` prune `knownPeerIDs` si la DHT n'a aucune entrée fraîche, - mais ne prune pas `liveIndexers` — une entrée expirée reste en mémoire jusqu'au GC - ou au prochain refresh. - ---- - -### 3.8 PubSub gossip (indexer registry) - -**Fonctionnement** - -Quand un indexeur s'enregistre auprès d'un natif, ce dernier publie l'adresse sur le -topic GossipSub `oc-indexer-registry`. Les autres natifs abonnés mettent à jour leur -`knownPeerIDs` sans attendre la DHT. - -Le `TopicValidator` rejette tout message dont le contenu n'est pas un multiaddr -parseable valide avant qu'il n'atteigne la boucle de traitement. - -**Avantages** -- Dissémination quasi-instantanée entre natifs connectés. -- Complément utile à la DHT pour les registrations récentes qui n'ont pas encore - été persistées. -- Le filtre syntaxique bloque les messages malformés avant propagation dans le mesh. - -**Limites / risques** -- ✅ **`TopicValidator` sans validation corrigé** : le validateur acceptait systématiquement - tous les messages (`return true`), permettant à un natif compromis de gossiper - n'importe quelle donnée. - → Le validateur vérifie désormais que le message est un multiaddr parseable - (`pp.AddrInfoFromString`). -- ⚠️ La validation reste syntaxique uniquement : l'origine du message (l'émetteur - est-il un natif légitime ?) n'est pas vérifiée. -- ⚠️ Si le natif redémarre, il perd son abonnement et manque les messages publiés - pendant son absence. La re-hydratation depuis la DHT compense, mais avec un délai - pouvant aller jusqu'à 30 s. -- ⚠️ Le gossip ne porte que le `Addr` de l'indexeur, pas sa TTL ni sa signature. - ---- - -### 3.9 Streams applicatifs (node ↔ node) - -**Fonctionnement** - -`StreamService` gère les streams entre nœuds partenaires (relations `PARTNER` stockées -en base) via des protocols dédiés (`/opencloud/resource/*`). Un heartbeat partenaire -(`ProtocolHeartbeatPartner`) maintient les connexions actives. Les events sont routés -via `handleEvent` et le système NATS en parallèle. - -**Avantages** -- TTL par protocol (`PersistantStream`, `WaitResponse`) adapte le comportement au - type d'échange (longue durée pour le planner, courte pour les CRUDs). -- La GC (`gc()` toutes les 8 s, démarrée une seule fois dans `InitStream`) libère - rapidement les streams expirés. - -**Limites / risques** -- ✅ **Fuite de goroutines GC corrigée** : `HandlePartnerHeartbeat` appelait - `go s.StartGC(30s)` à chaque heartbeat reçu (~20 s), créant un nouveau ticker - goroutine infini à chaque appel. - → Appel supprimé ; la GC lancée par `InitStream` est suffisante. -- ✅ **Boucle infinie sur EOF corrigée** : `readLoop` effectuait `s.Stream.Close(); - continue` après une erreur de décodage, re-tentant indéfiniment de lire un stream - fermé. - → Remplacé par `return` ; les defers (`Close`, `delete`) nettoient correctement. -- ⚠️ La récupération de partenaires depuis `conf.PeerIDS` est marquée `TO REMOVE` : - présence de code provisoire en production. - ---- - -## 4. Tableau récapitulatif - -| Mécanisme | Protocole | Avantage principal | État du risque | -|---|---|---|---| -| Heartbeat node→indexer | `/opencloud/heartbeat/1.0` | Détection rapide de perte | ⚠️ Stream TCP gelé non détecté | -| Scoring de confiance | (inline dans heartbeat) | Filtre les pairs instables | ✅ Deadlock corrigé (seuil 40/75) | -| Enregistrement natif | `/opencloud/native/subscribe/1.0` | TTL ample, cache immédiat | ✅ TTL porté à 90 s | -| Fetch pool d'indexeurs | `/opencloud/native/indexers/1.0` | Prend le 1er natif répondant | ⚠️ Natif au cache périmé possible | -| Consensus | `/opencloud/native/consensus/1.0` | Majorité absolue | ⚠️ Trivial avec 1 seul natif | -| Self-delegation + offload | (in-memory) | Disponibilité sans indexeur | ✅ Limite 50 peers + ClosePeer | -| Mesh natif | `/opencloud/native/peers/1.0` | Gossip multi-hop | ✅ Goroutines dédupliquées | -| DHT | `/oc/kad/1.0.0` | Persistance décentralisée | ⚠️ Retry infini, pas de bootstrap | -| PubSub registry | `oc-indexer-registry` | Dissémination rapide | ✅ Validation multiaddr | -| Streams applicatifs | `/opencloud/resource/*` | TTL par protocol | ✅ Fuite GC + EOF corrigés | - ---- - -## 5. Risques et limites globaux - -### Sécurité - -- ⚠️ **Adresses auto-rapportées non vérifiées** : le champ `IndexersBinded` dans le heartbeat - est auto-déclaré par le nœud et sert à calculer la diversité. Un pair malveillant peut - gonfler son score en déclarant de fausses adresses. -- ⚠️ **PSK comme seule barrière d'entrée** : si la PSK est compromise (elle est statique et - fichier-based), tout l'isolement réseau saute. Il n'y a pas de rotation de clé ni - d'authentification supplémentaire par pair. -- ⚠️ **DHT sans ACL sur les entrées indexeur** : la signature des `PeerRecord` est vérifiée - à la lecture, mais les `liveIndexerEntry` ne sont pas signées. La validation PubSub - bloque les multiaddrs invalides mais pas les adresses d'indexeurs légitimes usurpées. - -### Disponibilité - -- ⚠️ **Single point of failure natif** : avec un seul natif, la perte de celui-ci stoppe - toute attribution d'indexeurs. `retryLostNative` pallie, mais sans indexeurs, les nœuds - ne peuvent pas publier. -- ⚠️ **Bootstrap DHT** : sans nœuds bootstrap explicites, la DHT met du temps à converger - si les connexions initiales sont peu nombreuses. - -### Cohérence - -- ⚠️ **`replaceStaticIndexers` n'efface jamais** : d'anciens indexeurs morts restent dans - `StaticIndexers` jusqu'à ce que le heartbeat échoue. Un nœud peut avoir un pool - surévalué contenant des entrées inatteignables. -- ⚠️ **`TimeWatcher` global** : défini une seule fois au démarrage de `ConnectToIndexers`. - Si l'indexeur tourne depuis longtemps, les nouveaux nœuds auront un `uptime_ratio` - durablement faible. Le seuil abaissé à 40 pour le premier heartbeat atténue l'impact - initial, mais les heartbeats suivants devront accumuler un uptime suffisant. - ---- - -## 6. Pistes d'amélioration - -Les pistes déjà implémentées sont marquées ✅. Les pistes ouvertes restent à traiter. - -### ✅ Score : double seuil pour les nouveaux peers -~~Remplacer le seuil binaire~~ — **Implémenté** : seuil à 40 pour le premier heartbeat -(peer absent de `StreamRecords`), 75 pour les suivants. Un peer peut désormais être admis -dès sa première connexion sans bloquer sur l'uptime nul. -_Fichier : `common/common_stream.go`, `CheckHeartbeat`_ - -### ✅ TTL indexeur aligné avec l'intervalle de renouvellement -~~TTL de 66 s trop proche de 60 s~~ — **Implémenté** : `IndexerTTL` passé à **90 s**. -_Fichier : `indexer/native.go`_ - -### ✅ Limite de la self-delegation -~~`responsiblePeers` illimité~~ — **Implémenté** : `selfDelegate` retourne `false` quand -`len(responsiblePeers) >= maxFallbackPeers` (50). Le site d'appel retourne une réponse -vide et logue un warning. -_Fichier : `indexer/native.go`_ - -### ✅ Validation PubSub des adresses gossipées -~~`TopicValidator` accepte tout~~ — **Implémenté** : le validateur vérifie que le message -est un multiaddr parseable via `pp.AddrInfoFromString`. -_Fichier : `indexer/native.go`, `subscribeIndexerRegistry`_ - -### ✅ Goroutines heartbeat dédupliquées dans `EnsureNativePeers` -~~Une goroutine par adresse native~~ — **Implémenté** : `nativeMeshHeartbeatOnce` -garantit qu'une seule goroutine `SendHeartbeat` couvre toute la map `StaticNatives`. -_Fichier : `common/native_stream.go`_ - -### ✅ Fuite de goroutines GC dans `HandlePartnerHeartbeat` -~~`go s.StartGC(30s)` à chaque heartbeat~~ — **Implémenté** : appel supprimé ; la GC -de `InitStream` est suffisante. -_Fichier : `stream/service.go`_ - -### ✅ Boucle infinie sur EOF dans `readLoop` -~~`continue` après `Stream.Close()`~~ — **Implémenté** : remplacé par `return` pour -laisser les defers nettoyer proprement. -_Fichier : `stream/service.go`_ - ---- - -### ⚠️ Fetch pool : interroger tous les natifs en parallèle - -`fetchIndexersFromNative` s'arrête au premier natif répondant. Interroger tous les natifs -en parallèle et fusionner les listes (similairement à `clientSideConsensus`) éviterait -qu'un natif au cache périmé fournisse un pool sous-optimal. - -### ⚠️ Consensus avec quorum configurable - -Le seuil de confirmation (`count*2 > total`) est câblé en dur. Le rendre configurable -(ex. `consensus_quorum: 0.67`) permettrait de durcir la règle sur des déploiements -à 3+ natifs sans modifier le code. - -### ⚠️ Désenregistrement explicite - -Ajouter un protocole `/opencloud/native/unsubscribe/1.0` : quand un indexeur s'arrête -proprement, il notifie les natifs pour invalider son TTL immédiatement plutôt qu'attendre -90 s. - -### ⚠️ Bootstrap DHT explicite - -Configurer les natifs comme nœuds bootstrap DHT via `dht.BootstrapPeers` pour accélérer -la convergence Kademlia au démarrage. - -### ⚠️ Context propagé dans les goroutines longue durée - -`retryLostNative`, `refreshIndexersFromDHT` et `runOffloadLoop` ne reçoivent aucun -`context.Context`. Les passer depuis `InitNative` permettrait un arrêt propre lors du -shutdown du processus. - -### ⚠️ Redirection explicite lors du refus de self-delegation - -Quand un natif refuse la self-delegation (pool saturé), retourner vide force le nœud à -réessayer sans lui indiquer vers qui se tourner. Une liste de natifs alternatifs dans la -réponse (`AlternativeNatives []string`) permettrait au nœud de trouver directement un -natif moins chargé. diff --git a/daemons/node/common/common_cache.go b/daemons/node/common/common_cache.go index e39e4ab..44d4fe7 100644 --- a/daemons/node/common/common_cache.go +++ b/daemons/node/common/common_cache.go @@ -37,43 +37,6 @@ type Score struct { // Peer witnesses witnessChecked int witnessConsistent int - // WitnessPool: up to 3 witnesses last reported by this indexer. - // Used for indirect probing when the indexer becomes unreachable. - // Oldest entry is replaced when the pool is full and a fresher witness arrives. - WitnessPool []WitnessCacheEntry -} - -// WitnessCacheEntry holds one witness AddrInfo with its last-seen timestamp. -const maxWitnessPool = 3 - -type WitnessCacheEntry struct { - AI pp.AddrInfo - SeenAt time.Time -} - -// UpdateWitnessPool inserts or refreshes a witness entry. -// If the pool is full and the witness is new, the oldest entry is replaced. -func (s *Score) UpdateWitnessPool(w pp.AddrInfo) { - for i, e := range s.WitnessPool { - if e.AI.ID == w.ID { - s.WitnessPool[i].AI = w - s.WitnessPool[i].SeenAt = time.Now() - return - } - } - entry := WitnessCacheEntry{AI: w, SeenAt: time.Now()} - if len(s.WitnessPool) < maxWitnessPool { - s.WitnessPool = append(s.WitnessPool, entry) - return - } - // Replace oldest. - oldest := 0 - for i, e := range s.WitnessPool { - if e.SeenAt.Before(s.WitnessPool[oldest].SeenAt) { - oldest = i - } - } - s.WitnessPool[oldest] = entry } // computeNodeSideScore computes the node's quality assessment of an indexer from raw metrics. diff --git a/daemons/node/common/common_heartbeat.go b/daemons/node/common/common_heartbeat.go index 4349d98..8dd24af 100644 --- a/daemons/node/common/common_heartbeat.go +++ b/daemons/node/common/common_heartbeat.go @@ -172,46 +172,6 @@ func HandleWitnessQuery(h host.Host, s network.Stream) { json.NewEncoder(s).Encode(report) } -// IndirectProbeIndexer asks each witness in the cache whether it still sees -// the given indexer (by PeerID). Returns true if at least one witness confirms -// it is alive — meaning our direct link is asymmetrically broken, not the indexer. -// All probes run in parallel; the function blocks at most 5 seconds. -func IndirectProbeIndexer(h host.Host, indexerPeerID string, pool []WitnessCacheEntry) bool { - if len(pool) == 0 { - return false - } - results := make(chan bool, len(pool)) - for _, e := range pool { - go func(ai pp.AddrInfo) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s, err := h.NewStream(ctx, ai.ID, ProtocolWitnessQuery) - if err != nil { - results <- false - return - } - defer s.Reset() - s.SetDeadline(time.Now().Add(5 * time.Second)) - if err := json.NewEncoder(s).Encode(WitnessRequest{IndexerPeerID: indexerPeerID}); err != nil { - results <- false - return - } - var rep WitnessReport - if err := json.NewDecoder(s).Decode(&rep); err != nil { - results <- false - return - } - results <- rep.Seen - }(e.AI) - } - for range pool { - if <-results { - return true - } - } - return false -} - // SupportsHeartbeat probes pid with a short-lived stream to verify it has // a ProtocolHeartbeat handler (i.e. it is an indexer, not a plain node). // Only protocol negotiation is performed — no data is sent. diff --git a/daemons/node/common/common_indexer_hb.go b/daemons/node/common/common_indexer_hb.go index d232034..a575499 100644 --- a/daemons/node/common/common_indexer_hb.go +++ b/daemons/node/common/common_indexer_hb.go @@ -368,7 +368,6 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H resp, rtt, err := sendHeartbeat(ctx, h, proto, ai.Info, hb, directory.Streams, interval*time.Second) if err != nil { // Heartbeat fails - fmt.Println("EERR", err) HeartbeatFailure(h, proto, directory, ai.Addr, ai.Info, isIndexerHB, maxPool, err) continue } @@ -377,6 +376,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H // even if the indexer does not support bidirectional heartbeat (Fix 1). if isIndexerHB && score != nil { score.UptimeTracker.RecordHeartbeat() + score.UptimeTracker.ConsecutiveFails = 0 // reset on success maxRTT := BaseRoundTrip * 10 latencyScore := 1.0 - float64(rtt)/float64(maxRTT) @@ -442,11 +442,6 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H } } - // Refresh local witness cache for indirect probing on future failure. - for _, w := range resp.Witnesses { - score.UpdateWitnessPool(w) - } - // Launch witness cross-check asynchronously (must not hold lock). if len(resp.Witnesses) > 0 { go queryWitnesses(h, ai.Info.ID.String(), resp.BornAt, resp.FillRate, resp.Witnesses, score) @@ -550,16 +545,22 @@ func HeartbeatFailure(h host.Host, proto protocol.ID, directory *Directory, Msg("[pool] seed heartbeat failed — keeping in pool, ticker will retry " + err.Error()) return } - // Indirect probe: query cached witnesses before declaring the indexer dead. - // If a witness confirms it is alive, the failure is a local asymmetric - // link — not the indexer. Skip eviction; next tick will retry directly. - if len(score.WitnessPool) > 0 { - pool := append([]WitnessCacheEntry(nil), score.WitnessPool...) - if IndirectProbeIndexer(h, info.ID.String(), pool) { + // Indirect probing via other alive indexers: + // If other indexers in the pool are still responding, they act as implicit + // third-party witnesses confirming our connectivity is fine — the failed + // indexer is genuinely dead, evict immediately. + // If this is the last indexer, there is no third party. Retry up to 3 times + // (consecutive failures tracked in UptimeTracker) before declaring it dead. + if len(directory.GetAddrs()) <= 1 { + score.UptimeTracker.ConsecutiveFails++ + if score.UptimeTracker.ConsecutiveFails < 3 { logger.Warn().Str("peer", info.ID.String()). - Msg("[indirect] witness confirms indexer alive — asymmetric link, skipping eviction " + err.Error()) + Int("attempt", score.UptimeTracker.ConsecutiveFails). + Msg("[indirect] last indexer failed, retrying before eviction") return } + logger.Warn().Str("peer", info.ID.String()). + Msg("[indirect] last indexer failed 3 times consecutively, evicting") } } } diff --git a/daemons/node/common/common_scoring.go b/daemons/node/common/common_scoring.go index 00f38f6..2265829 100644 --- a/daemons/node/common/common_scoring.go +++ b/daemons/node/common/common_scoring.go @@ -18,9 +18,10 @@ const MaxPayloadChallenge = 2048 const BaseRoundTrip = 400 * time.Millisecond type UptimeTracker struct { - FirstSeen time.Time - LastSeen time.Time - TotalOnline time.Duration + FirstSeen time.Time + LastSeen time.Time + TotalOnline time.Duration + ConsecutiveFails int // incremented on each heartbeat failure; reset to 0 on success } // RecordHeartbeat accumulates online time gap-aware: only counts the interval if diff --git a/daemons/node/common/interface.go b/daemons/node/common/interface.go index cedebbc..864435a 100644 --- a/daemons/node/common/interface.go +++ b/daemons/node/common/interface.go @@ -12,6 +12,6 @@ type HeartBeatStreamed interface { } type DiscoveryPeer interface { - GetPeerRecord(ctx context.Context, key string, search bool) ([]*peer.Peer, error) + GetPeerRecord(ctx context.Context, key string) ([]*peer.Peer, error) GetPubSub(topicName string) *pubsub.Topic } diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index 5723afa..7181d44 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "fmt" "io" "math/rand" "oc-discovery/daemons/node/common" @@ -94,8 +93,6 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe type GetValue struct { Key string `json:"key"` PeerID string `json:"peer_id,omitempty"` - Name string `json:"name,omitempty"` - Search bool `json:"search,omitempty"` } type GetResponse struct { @@ -107,10 +104,6 @@ func (ix *IndexerService) genKey(did string) string { return "/node/" + did } -func (ix *IndexerService) genNameKey(name string) string { - return "/name/" + name -} - func (ix *IndexerService) genPIDKey(peerID string) string { return "/pid/" + peerID } @@ -163,16 +156,10 @@ func (ix *IndexerService) initNodeHandler() { return } cancel() - ix.publishNameEvent(NameIndexAdd, rec.Name, rec.PeerID, rec.DID) - if rec.Name != "" { - ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) - ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)) - cancel2() - } if rec.PeerID != "" { - ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second) - ix.DHT.PutValue(ctx3, ix.genPIDKey(rec.PeerID), []byte(rec.DID)) - cancel3() + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID)) + cancel2() } } ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleHeartbeat) @@ -277,24 +264,13 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) { } cancel() - fmt.Println("publishNameEvent") - ix.publishNameEvent(NameIndexAdd, rec.Name, rec.PeerID, rec.DID) - - // Secondary index: /name/ → DID, so peers can resolve by human-readable name. - if rec.Name != "" { - ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) - if err := ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)); err != nil { - logger.Err(err).Str("name", rec.Name).Msg("indexer: failed to write name index") - } - cancel2() - } // Secondary index: /pid/ → DID, so peers can resolve by libp2p PeerID. if rec.PeerID != "" { - ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second) - if err := ix.DHT.PutValue(ctx3, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil { + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + if err := ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil { logger.Err(err).Str("pid", rec.PeerID).Msg("indexer: failed to write pid index") } - cancel3() + cancel2() } return } @@ -324,52 +300,30 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) { resp := GetResponse{Found: false, Records: map[string]PeerRecord{}} - fmt.Println("handleNodeGet", req.Search, req.Name) - keys := []string{} - // Name substring search — scan in-memory connected nodes first, then DHT exact match. - if req.Name != "" { - if req.Search { - for _, did := range ix.LookupNameIndex(strings.ToLower(req.Name)) { - keys = append(keys, did) - } - } else { - // 2. DHT exact-name lookup: covers nodes that published but aren't currently connected. - nameCtx, nameCancel := context.WithTimeout(context.Background(), 5*time.Second) - if ch, err := ix.DHT.SearchValue(nameCtx, ix.genNameKey(req.Name)); err == nil { - for did := range ch { - keys = append(keys, string(did)) - break - } - } - nameCancel() - } - } else if req.PeerID != "" { + // Resolve DID key: by PeerID (secondary /pid/ index) or direct DID key. + var key string + if req.PeerID != "" { pidCtx, pidCancel := context.WithTimeout(context.Background(), 5*time.Second) - if did, err := ix.DHT.GetValue(pidCtx, ix.genPIDKey(req.PeerID)); err == nil { - keys = append(keys, string(did)) - } + did, err := ix.DHT.GetValue(pidCtx, ix.genPIDKey(req.PeerID)) pidCancel() + if err == nil { + key = string(did) + } } else { - keys = append(keys, req.Key) + key = req.Key } - // DHT record fetch by DID key (covers exact-name and PeerID paths). - if len(keys) > 0 { - for _, k := range keys { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - fmt.Println("TRY TO CATCH DID", ix.genKey(k)) - c, err := ix.DHT.GetValue(ctx, ix.genKey(k)) - cancel() - fmt.Println("TRY TO CATCH DID ERR", ix.genKey(k), c, err) - if err == nil { - var rec PeerRecord - if json.Unmarshal(c, &rec) == nil { - fmt.Println("CATCH DID ERR", ix.genKey(k), rec) - resp.Records[rec.PeerID] = rec - } - } else if req.Name == "" && req.PeerID == "" { - logger.Err(err).Msg("Failed to fetch PeerRecord from DHT " + req.Key) + if key != "" { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + c, err := ix.DHT.GetValue(ctx, ix.genKey(key)) + cancel() + if err == nil { + var rec PeerRecord + if json.Unmarshal(c, &rec) == nil { + resp.Records[rec.PeerID] = rec } + } else { + logger.Err(err).Msg("Failed to fetch PeerRecord from DHT " + key) } } diff --git a/daemons/node/indexer/nameindex.go b/daemons/node/indexer/nameindex.go deleted file mode 100644 index d0e6521..0000000 --- a/daemons/node/indexer/nameindex.go +++ /dev/null @@ -1,320 +0,0 @@ -package indexer - -import ( - "context" - "encoding/json" - "strings" - "sync" - "time" - - "oc-discovery/daemons/node/common" - - oclib "cloud.o-forge.io/core/oc-lib" - pubsub "github.com/libp2p/go-libp2p-pubsub" - pp "github.com/libp2p/go-libp2p/core/peer" -) - -// TopicNameIndex is the GossipSub topic shared by regular indexers to exchange -// add/delete events for the distributed name→peerID mapping. -const TopicNameIndex = "oc-name-index" - -// nameIndexDedupWindow suppresses re-emission of the same (action, name, peerID) -// tuple within this window, reducing duplicate events when a node is registered -// with multiple indexers simultaneously. -const nameIndexDedupWindow = 30 * time.Second - -// NameIndexAction indicates whether a name mapping is being added or removed. -type NameIndexAction string - -const ( - NameIndexAdd NameIndexAction = "add" - NameIndexDelete NameIndexAction = "delete" -) - -// NameIndexEvent is published on TopicNameIndex by each indexer when a node -// registers (add) or is evicted by the GC (delete). -type NameIndexEvent struct { - Action NameIndexAction `json:"action"` - Name string `json:"name"` - PeerID string `json:"peer_id"` - DID string `json:"did"` -} - -// nameIndexState holds the local in-memory name index and the sender-side -// deduplication tracker. -// -// Search strategy: trigram inverted index. -// - byName: lowercased name → peerID → DID (for delete and exact resolution) -// - byPeer: peerID → lowercased name (to recompute trigrams on delete) -// - trigrams: 3-char substring → set of peerIDs (for O(1) substring lookup) -// -// For needles shorter than 3 chars the trigram index cannot help; a linear -// scan of byName is used as fallback (rare and fast enough at small N). -type nameIndexState struct { - byName map[string]map[string]string // name → peerID → DID - byPeer map[string]string // peerID → name - trigrams map[string]map[string]struct{} // trigram → peerID set - indexMu sync.RWMutex - - // emitted deduplicates GossipSub emissions within nameIndexDedupWindow. - // Purged periodically to prevent unbounded growth. - emitted map[string]time.Time - emittedMu sync.Mutex -} - -// trigramsOf returns all overlapping 3-char substrings of s (already lowercased). -// If s is shorter than 3 chars the string itself is returned as the sole token. -func trigramsOf(s string) []string { - if len(s) < 3 { - return []string{s} - } - out := make([]string, 0, len(s)-2) - for i := 0; i <= len(s)-3; i++ { - out = append(out, s[i:i+3]) - } - return out -} - -// addTrigrams inserts peerID into every trigram bucket for name. -func (s *nameIndexState) addTrigrams(name, peerID string) { - for _, tg := range trigramsOf(name) { - if s.trigrams[tg] == nil { - s.trigrams[tg] = map[string]struct{}{} - } - s.trigrams[tg][peerID] = struct{}{} - } -} - -// removeTrigrams deletes peerID from every trigram bucket for name, -// cleaning up empty buckets to keep memory tight. -func (s *nameIndexState) removeTrigrams(name, peerID string) { - for _, tg := range trigramsOf(name) { - if m := s.trigrams[tg]; m != nil { - delete(m, peerID) - if len(m) == 0 { - delete(s.trigrams, tg) - } - } - } -} - -// shouldEmit returns true if the (action, name, peerID) tuple has not been -// emitted within nameIndexDedupWindow, updating the tracker if so. -// -// On DELETE: the ADD entry for the same peer is immediately removed — the peer -// is gone, keeping it would cause the map to grow with departed peers forever. -// The DELETE entry itself is kept for the dedup window to absorb duplicate -// delete events, then cleaned by the purgeEmitted ticker. -func (s *nameIndexState) shouldEmit(action NameIndexAction, name, peerID string) bool { - key := string(action) + ":" + name + ":" + peerID - s.emittedMu.Lock() - defer s.emittedMu.Unlock() - if t, ok := s.emitted[key]; ok && time.Since(t) < nameIndexDedupWindow { - return false - } - s.emitted[key] = time.Now() - if action == NameIndexDelete { - // Peer is leaving: drop its ADD entry — no longer needed. - delete(s.emitted, string(NameIndexAdd)+":"+name+":"+peerID) - } - return true -} - -// purgeEmitted removes stale DELETE entries from the emitted dedup map. -// ADD entries are cleaned eagerly on DELETE, so only short-lived DELETE -// entries remain here; the ticker just trims those stragglers. -func (s *nameIndexState) purgeEmitted() { - now := time.Now() - s.emittedMu.Lock() - defer s.emittedMu.Unlock() - for k, t := range s.emitted { - if now.Sub(t) >= nameIndexDedupWindow { - delete(s.emitted, k) - } - } -} - -// onEvent applies a received NameIndexEvent to the local index. -// "add" inserts/updates the mapping; "delete" removes it. -// Operations are idempotent — duplicate events from multiple indexers are harmless. -func (s *nameIndexState) onEvent(evt NameIndexEvent) { - if evt.Name == "" || evt.PeerID == "" { - return - } - nameLow := strings.ToLower(evt.Name) - s.indexMu.Lock() - defer s.indexMu.Unlock() - switch evt.Action { - case NameIndexAdd: - // If the peer previously had a different name, clean up old trigrams. - if old, ok := s.byPeer[evt.PeerID]; ok && old != nameLow { - s.removeTrigrams(old, evt.PeerID) - if s.byName[old] != nil { - delete(s.byName[old], evt.PeerID) - if len(s.byName[old]) == 0 { - delete(s.byName, old) - } - } - } - if s.byName[nameLow] == nil { - s.byName[nameLow] = map[string]string{} - } - s.byName[nameLow][evt.PeerID] = evt.DID - s.byPeer[evt.PeerID] = nameLow - s.addTrigrams(nameLow, evt.PeerID) - - case NameIndexDelete: - // Use stored name so trigrams match exactly what was indexed. - name := nameLow - if stored, ok := s.byPeer[evt.PeerID]; ok { - name = stored - } - s.removeTrigrams(name, evt.PeerID) - delete(s.byPeer, evt.PeerID) - if s.byName[name] != nil { - delete(s.byName[name], evt.PeerID) - if len(s.byName[name]) == 0 { - delete(s.byName, name) - } - } - } -} - -// initNameIndex joins TopicNameIndex and starts consuming events. -// Must be called after ix.PS is ready. -func (ix *IndexerService) initNameIndex(ps *pubsub.PubSub) { - logger := oclib.GetLogger() - state := &nameIndexState{ - byName: map[string]map[string]string{}, - byPeer: map[string]string{}, - trigrams: map[string]map[string]struct{}{}, - emitted: map[string]time.Time{}, - } - ix.nameIndex = state - - // Periodically purge the emitted dedup map so it doesn't grow forever. - go func() { - t := time.NewTicker(nameIndexDedupWindow) - defer t.Stop() - for range t.C { - state.purgeEmitted() - } - }() - - ps.RegisterTopicValidator(TopicNameIndex, func(_ context.Context, _ pp.ID, _ *pubsub.Message) bool { - return true - }) - topic, err := ps.Join(TopicNameIndex) - if err != nil { - logger.Err(err).Msg("name index: failed to join topic") - return - } - ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Lock() - ix.LongLivedStreamRecordedService.LongLivedPubSubService.LongLivedPubSubs[TopicNameIndex] = topic - ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Unlock() - - common.SubscribeEvents( - ix.LongLivedStreamRecordedService.LongLivedPubSubService, - context.Background(), - TopicNameIndex, - -1, - func(_ context.Context, evt NameIndexEvent, _ string) { - ix.nameIndex.onEvent(evt) - }, - ) -} - -// publishNameEvent emits a NameIndexEvent on TopicNameIndex, subject to the -// sender-side deduplication window. -func (ix *IndexerService) publishNameEvent(action NameIndexAction, name, peerID, did string) { - if ix.nameIndex == nil || name == "" || peerID == "" { - return - } - if !ix.nameIndex.shouldEmit(action, name, peerID) { - return - } - ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.RLock() - topic := ix.LongLivedStreamRecordedService.LongLivedPubSubService.LongLivedPubSubs[TopicNameIndex] - ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.RUnlock() - if topic == nil { - return - } - evt := NameIndexEvent{Action: action, Name: name, PeerID: peerID, DID: did} - b, err := json.Marshal(evt) - if err != nil { - return - } - _ = topic.Publish(context.Background(), b) -} - -// LookupNameIndex searches the distributed name index for peers whose name -// contains needle (case-insensitive). Returns peerID → DID for matched peers. -// Returns nil if the name index is not initialised. -// -// Algorithm: -// - needle ≥ 3 chars: trigram intersection → O(|candidates|) verify pass. -// The trigram index immediately narrows the candidate set; false positives -// are eliminated by the full-string contains check. -// - needle < 3 chars: linear scan of byName (rare, still fast at small N). -func (ix *IndexerService) LookupNameIndex(needle string) map[string]string { - if ix.nameIndex == nil { - return nil - } - needleLow := strings.ToLower(needle) - result := map[string]string{} - - ix.nameIndex.indexMu.RLock() - defer ix.nameIndex.indexMu.RUnlock() - - if len(needleLow) < 3 { - // Short needle: linear scan fallback. - for name, peers := range ix.nameIndex.byName { - if strings.Contains(name, needleLow) { - for peerID, did := range peers { - result[peerID] = did - } - } - } - return result - } - - // Trigram intersection: start with the first trigram's set, then - // progressively intersect with each subsequent trigram's set. - tgs := trigramsOf(needleLow) - var candidates map[string]struct{} - for _, tg := range tgs { - set := ix.nameIndex.trigrams[tg] - if len(set) == 0 { - return result // any empty trigram set → no possible match - } - if candidates == nil { - candidates = make(map[string]struct{}, len(set)) - for pid := range set { - candidates[pid] = struct{}{} - } - } else { - for pid := range candidates { - if _, ok := set[pid]; !ok { - delete(candidates, pid) - } - } - } - if len(candidates) == 0 { - return result - } - } - - // Full-string verification pass: trigrams admit false positives - // (e.g. "abc" and "bca" share the trigram "bc_" with a rotated name). - for peerID := range candidates { - name := ix.nameIndex.byPeer[peerID] - if strings.Contains(name, needleLow) { - did := "" - if m := ix.nameIndex.byName[name]; m != nil { - did = m[peerID] - } - result[peerID] = did - } - } - return result -} diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go index 77dcdbe..260e768 100644 --- a/daemons/node/indexer/service.go +++ b/daemons/node/indexer/service.go @@ -48,7 +48,6 @@ type IndexerService struct { DHT *dht.IpfsDHT isStrictIndexer bool mu sync.RWMutex - nameIndex *nameIndexState dhtProvideCancel context.CancelFunc bornAt time.Time // Passive DHT cache: refreshed every 2 min in background, used for suggestions. @@ -99,10 +98,7 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ go ix.SubscribeToSearch(ix.PS, nil) } - logger.Info().Msg("init distributed name index...") - ix.initNameIndex(ps) ix.LongLivedStreamRecordedService.AfterDelete = func(pid pp.ID, name, did string) { - ix.publishNameEvent(NameIndexDelete, name, pid.String(), did) // Remove behavior state for peers that are no longer connected and // have no active ban — keeps memory bounded to the live node set. ix.behavior.Cleanup(pid) @@ -489,9 +485,6 @@ func (ix *IndexerService) Close() { } ix.DHT.Close() ix.PS.UnregisterTopicValidator(common.TopicPubSubSearch) - if ix.nameIndex != nil { - ix.PS.UnregisterTopicValidator(TopicNameIndex) - } for _, s := range ix.StreamRecords { for _, ss := range s { ss.HeartbeatStream.Stream.Close() diff --git a/daemons/node/nats.go b/daemons/node/nats.go index d80e2cb..138ac31 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -115,7 +115,7 @@ func ListenNATS(n *Node) { proto = stream.ProtocolMinioConfigResource } if err := json.Unmarshal(resp.Payload, &m); err == nil { - peers, _ := n.GetPeerRecord(context.Background(), m.PeerID, false) + peers, _ := n.GetPeerRecord(context.Background(), m.PeerID) for _, p := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, p.PeerID, proto, resp.Payload) @@ -135,7 +135,7 @@ func ListenNATS(n *Node) { var m executionConsidersPayload if err := json.Unmarshal(resp.Payload, &m); err == nil { for _, p := range m.PeerIDs { - peers, _ := n.GetPeerRecord(context.Background(), p, false) + peers, _ := n.GetPeerRecord(context.Background(), p) for _, pp := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, pp.PeerID, stream.ProtocolConsidersResource, resp.Payload) @@ -148,7 +148,7 @@ func ListenNATS(n *Node) { OriginID string `json:"origin_id"` } if err := json.Unmarshal(propalgation.Payload, &m); err == nil && m.OriginID != "" { - peers, _ := n.GetPeerRecord(context.Background(), m.OriginID, false) + peers, _ := n.GetPeerRecord(context.Background(), m.OriginID) for _, p := range peers { n.StreamService.PublishCommon(nil, resp.User, p.PeerID, stream.ProtocolConsidersResource, propalgation.Payload) diff --git a/daemons/node/node.go b/daemons/node/node.go index 2b573b6..c8967ab 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -146,7 +146,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { if err != nil || evt.From == node.PeerID.String() { return } - if p, err := node.GetPeerRecord(ctx, evt.From, false); err == nil && len(p) > 0 && m["search"] != nil { + if p, err := node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 && m["search"] != nil { node.StreamService.SendResponse(p[0], &evt, fmt.Sprintf("%v", m["search"])) } } @@ -275,21 +275,18 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea func (d *Node) GetPeerRecord( ctx context.Context, pidOrdid string, - search bool, ) ([]*peer.Peer, error) { var err error var info map[string]indexer.PeerRecord // Build the GetValue request: if pidOrdid is neither a UUID DID nor a libp2p // PeerID, treat it as a human-readable name and let the indexer resolve it. + // GetPeerRecord resolves by PeerID or DID only. + // Name-based search goes through SearchPeerRecord (ProtocolSearchPeer). getReq := indexer.GetValue{Key: pidOrdid} if pidR, pidErr := pp.Decode(pidOrdid); pidErr == nil { getReq.PeerID = pidR.String() - } else if _, uuidErr := uuid.Parse(pidOrdid); uuidErr != nil { - // Not a UUID DID → treat pidOrdid as a name substring search. - getReq.Name = pidOrdid getReq.Key = "" } - getReq.Search = search for _, ad := range common.Indexers.GetAddrs() { if common.Indexers.Streams, err = common.TempStream(d.Host, *ad.Info, common.ProtocolGet, "", common.Indexers.Streams, map[protocol.ID]*common.ProtocolInfo{}, &common.Indexers.MuStream); err != nil { diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index d46b42f..e2503c3 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -164,7 +164,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri fmt.Println(evt.From, p.GetID(), peers.Data) ps.SendResponse(p, evt, fmt.Sprintf("%v", search)) - } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From, false); err == nil && len(p) > 0 { // peer from is peerID + } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID ps.SendResponse(p[0], evt, fmt.Sprintf("%v", search)) } } else { diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index df46e83..3dd19cd 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -49,7 +49,7 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID var pe *peer.Peer if len(p.Data) > 0 && p.Data[0].(*peer.Peer).Relation != peer.BLACKLIST { pe = p.Data[0].(*peer.Peer) - } else if pps, err := ps.Node.GetPeerRecord(context.Background(), toPeerID, false); err == nil && len(pps) > 0 { + } else if pps, err := ps.Node.GetPeerRecord(context.Background(), toPeerID); err == nil && len(pps) > 0 { pe = pps[0] } if pe != nil {