4 Commits

Author SHA1 Message Date
mr
46dee0a6cb To UTC 2026-03-19 08:26:46 +01:00
mr
2108df5283 NATS rationalized 2026-03-18 16:45:33 +01:00
mr
380de4c80b rationalized NATS 2026-03-18 16:45:11 +01:00
mr
83285c2ab5 Debug 2026-03-17 11:57:22 +01:00
16 changed files with 231 additions and 262 deletions

View File

@@ -42,7 +42,10 @@ type LongLivedStreamRecordedService[T interface{}] struct {
// remotePeer is the peer that sent the heartbeat (used for offload routing).
// need is how many more indexers the node wants (from hb.Need).
// referent is true when the node designated this indexer as its search referent.
BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *HeartbeatResponse
// rawRecord is the fresh signed PeerRecord embedded in the heartbeat (hb.Record),
// passed directly so the handler does not race with AfterHeartbeat goroutine
// updating StreamRecord.Record.
BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *HeartbeatResponse
}
func (ix *LongLivedStreamRecordedService[T]) MaxNodesConn() int {
@@ -240,11 +243,11 @@ func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) {
ix.StreamMU.RUnlock()
}
if ix.AfterHeartbeat != nil && hb.DID != "" {
ix.AfterHeartbeat(hb)
go ix.AfterHeartbeat(hb)
}
// Send response back to the node (bidirectional heartbeat).
if ix.BuildHeartbeatResponse != nil {
if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent); resp != nil {
if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent, hb.Record); resp != nil {
s.SetWriteDeadline(time.Now().Add(3 * time.Second))
json.NewEncoder(s).Encode(resp)
s.SetWriteDeadline(time.Time{})

View File

@@ -138,8 +138,7 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad
logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String())
s, err := h.NewStream(ctx, p.ID, proto)
if err != nil {
fmt.Println(s, err, p.ID)
logger.Err(err)
logger.Err(err).Msg(err.Error())
return nil, 0, err
}
pss = &Stream{

View File

@@ -1,16 +1,6 @@
package node
import (
"context"
"encoding/json"
"time"
"oc-discovery/daemons/node/common"
"oc-discovery/daemons/node/indexer"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/peer"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@@ -18,13 +8,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
// OCConnectionGater enforces two rules on every inbound connection:
// 1. If the peer is known locally and blacklisted → reject.
// 2. If the peer is unknown locally → ask indexers one by one whether it
// exists in the DHT. Accept as soon as one confirms it; reject if none do
// (or if no indexers are reachable yet, allow optimistically).
//
// Outbound connections are always allowed — we chose to dial them.
// OCConnectionGater allows all connections unconditionally.
// Peer validation (local DB + DHT by peer_id) is enforced at the stream level
// in each handler, so ProtocolHeartbeat and ProtocolPublish — through which a
// node first registers itself — are never blocked.
type OCConnectionGater struct {
host host.Host
}
@@ -33,105 +20,12 @@ func newOCConnectionGater(h host.Host) *OCConnectionGater {
return &OCConnectionGater{host: h}
}
// InterceptPeerDial — allow all outbound dials.
func (g *OCConnectionGater) InterceptPeerDial(_ pp.ID) bool { return true }
// InterceptAddrDial — allow all outbound dials.
func (g *OCConnectionGater) InterceptAddrDial(_ pp.ID, _ ma.Multiaddr) bool { return true }
// InterceptAccept — allow at transport level (PeerID not yet known).
func (g *OCConnectionGater) InterceptAccept(_ network.ConnMultiaddrs) bool { return true }
// InterceptUpgraded — final gate; always allow (decisions already made in InterceptSecured).
func (g *OCConnectionGater) InterceptPeerDial(_ pp.ID) bool { return true }
func (g *OCConnectionGater) InterceptAddrDial(_ pp.ID, _ ma.Multiaddr) bool { return true }
func (g *OCConnectionGater) InterceptAccept(_ network.ConnMultiaddrs) bool { return true }
func (g *OCConnectionGater) InterceptSecured(_ network.Direction, _ pp.ID, _ network.ConnMultiaddrs) bool {
return true
}
func (g *OCConnectionGater) InterceptUpgraded(_ network.Conn) (bool, control.DisconnectReason) {
return true, 0
}
// InterceptSecured is called after the cryptographic handshake — PeerID is now known.
// Only inbound connections are verified; outbound are trusted.
func (g *OCConnectionGater) InterceptSecured(dir network.Direction, pid pp.ID, _ network.ConnMultiaddrs) bool {
if dir == network.DirOutbound {
return true
}
logger := oclib.GetLogger()
// 1. Local DB lookup by PeerID.
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
results := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // search by name if no filters are provided
"peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}},
},
}, pid.String(), false)
for _, item := range results.Data {
p, ok := item.(*peer.Peer)
if !ok || p.PeerID != pid.String() {
continue
}
if p.Relation == peer.BLACKLIST {
logger.Warn().Str("peer", pid.String()).Msg("[gater] rejected blacklisted peer")
return false
}
// Known, not blacklisted.
return true
}
// 2. Unknown locally — verify via indexers.
indexers := common.Indexers.GetAddrs()
if len(indexers) == 0 {
// No indexers reachable yet — allow optimistically (bootstrap phase).
logger.Warn().Str("peer", pid.String()).Msg("[gater] no indexers available, allowing unverified inbound")
return true
}
req := indexer.GetValue{PeerID: pid.String()}
// A single DHT GetValue already traverses the entire DHT network, so asking
// a second indexer would yield the same result. We only fall through to the
// next indexer if the current one is unreachable (transport error), not if
// it returns found=false (that answer is already DHT-wide authoritative).
for _, ai := range indexers {
found, reachable := queryIndexerPeerExists(g.host, *ai.Info, req)
if !reachable {
continue // indexer down — try next
}
if !found {
logger.Warn().Str("peer", pid.String()).Msg("[gater] peer not found in DHT, rejecting inbound")
}
return found // definitive DHT answer
}
// All indexers unreachable — allow optimistically rather than blocking indefinitely.
logger.Warn().Str("peer", pid.String()).Msg("[gater] all indexers unreachable, allowing unverified inbound")
return true
}
// queryIndexerPeerExists opens a fresh one-shot stream to ai, sends a GetValue
// request, and returns (found, reachable).
// reachable=false means the indexer could not be reached (transport error);
// the caller should then try another indexer.
// reachable=true means the indexer answered — found is the DHT-wide authoritative result.
func queryIndexerPeerExists(h host.Host, ai pp.AddrInfo, req indexer.GetValue) (found, reachable bool) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if h.Network().Connectedness(ai.ID) != network.Connected {
if err := h.Connect(ctx, ai); err != nil {
return false, false
}
}
s, err := h.NewStream(ctx, ai.ID, common.ProtocolGet)
if err != nil {
return false, false
}
defer s.Close()
s.SetDeadline(time.Now().Add(3 * time.Second))
if err := json.NewEncoder(s).Encode(req); err != nil {
return false, false
}
var resp indexer.GetResponse
if err := json.NewDecoder(s).Decode(&resp); err != nil {
return false, false
}
return resp.Found, true
}

View File

@@ -82,7 +82,7 @@ type nodeBehavior struct {
}
func (nb *nodeBehavior) isBanned() bool {
return time.Now().Before(nb.bannedUntil)
return time.Now().UTC().Before(nb.bannedUntil)
}
func (nb *nodeBehavior) strike(n int) {

View File

@@ -12,6 +12,7 @@ import (
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
pp "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -108,6 +109,49 @@ func (ix *IndexerService) genPIDKey(peerID string) string {
return "/pid/" + peerID
}
// isPeerKnown is the stream-level gate: returns true if pid is allowed.
// Check order (fast → slow):
// 1. In-memory stream records — currently heartbeating to this indexer.
// 2. Local DB by peer_id — known peer, blacklist enforced here.
// 3. DHT /pid/{peerID} → /node/{DID} — registered on any indexer.
//
// ProtocolHeartbeat and ProtocolPublish handlers do NOT call this — they are
// the streams through which a node first makes itself known.
func (ix *IndexerService) isPeerKnown(pid lpp.ID) bool {
// 1. Fast path: active heartbeat session.
ix.StreamMU.RLock()
_, active := ix.StreamRecords[common.ProtocolHeartbeat][pid]
ix.StreamMU.RUnlock()
if active {
return true
}
// 2. Local DB: known peer (handles blacklist).
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
results := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}},
},
}, pid.String(), false)
for _, item := range results.Data {
p, ok := item.(*pp.Peer)
if !ok || p.PeerID != pid.String() {
continue
}
return p.Relation != pp.BLACKLIST
}
// 3. DHT lookup by peer_id.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
did, err := ix.DHT.GetValue(ctx, ix.genPIDKey(pid.String()))
cancel()
if err != nil || len(did) == 0 {
return false
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 3*time.Second)
_, err = ix.DHT.GetValue(ctx2, ix.genKey(string(did)))
cancel2()
return err == nil
}
func (ix *IndexerService) initNodeHandler() {
logger := oclib.GetLogger()
logger.Info().Msg("Init Node Handler")
@@ -144,21 +188,32 @@ func (ix *IndexerService) initNodeHandler() {
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: heartbeat record signature invalid")
return
}
// Keep StreamRecord.Record in sync so BuildHeartbeatResponse always
// sees a populated PeerRecord (Name, DID, etc.) regardless of whether
// handleNodePublish ran before or after the heartbeat stream was opened.
if pid, err := lpp.Decode(rec.PeerID); err == nil {
ix.StreamMU.Lock()
if srec, ok := ix.StreamRecords[common.ProtocolHeartbeat][pid]; ok {
srec.Record = rec
}
ix.StreamMU.Unlock()
}
data, err := json.Marshal(rec)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
logger.Info().Msg("REFRESH PutValue " + ix.genKey(rec.DID))
if err := ix.DHT.PutValue(ctx, ix.genKey(rec.DID), data); err != nil {
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh failed")
cancel()
return
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh /node/ failed")
}
cancel()
// /pid/ is written unconditionally — the gater queries by PeerID and this
// index must stay fresh regardless of whether the /node/ write succeeded.
if rec.PeerID != "" {
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID))
if err := ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil {
logger.Warn().Err(err).Str("pid", rec.PeerID).Msg("indexer: DHT refresh /pid/ failed")
}
cancel2()
}
}
@@ -173,6 +228,12 @@ func (ix *IndexerService) initNodeHandler() {
// Returns a random sample of indexers from the local DHT cache.
func (ix *IndexerService) handleCandidateRequest(s network.Stream) {
defer s.Close()
if !ix.isPeerKnown(s.Conn().RemotePeer()) {
logger := oclib.GetLogger()
logger.Warn().Str("peer", s.Conn().RemotePeer().String()).Msg("[candidates] unknown peer, rejecting stream")
s.Reset()
return
}
s.SetDeadline(time.Now().Add(5 * time.Second))
var req common.IndexerCandidatesRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
@@ -280,6 +341,11 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
remotePeer := s.Conn().RemotePeer()
if !ix.isPeerKnown(remotePeer) {
logger.Warn().Str("peer", remotePeer.String()).Msg("[get] unknown peer, rejecting stream")
s.Reset()
return
}
if err := ix.behavior.RecordGet(remotePeer); err != nil {
logger.Warn().Err(err).Str("peer", remotePeer.String()).Msg("get refused")
s.Reset()

View File

@@ -74,6 +74,11 @@ func (ix *IndexerService) handleSearchPeer(s network.Stream) {
logger := oclib.GetLogger()
defer s.Reset()
if !ix.isPeerKnown(s.Conn().RemotePeer()) {
logger.Warn().Str("peer", s.Conn().RemotePeer().String()).Msg("[search] unknown peer, rejecting stream")
return
}
var req common.SearchPeerRequest
if err := json.NewDecoder(s).Decode(&req); err != nil || req.QueryID == "" {
return

View File

@@ -162,7 +162,7 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ
// Build and send a HeartbeatResponse after each received node heartbeat.
// Raw metrics only — no pre-cooked score. Node computes the score itself.
ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *common.HeartbeatResponse {
ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *common.HeartbeatResponse {
ix.StreamMU.RLock()
peerCount := len(ix.StreamRecords[common.ProtocolHeartbeat])
// Collect lastSeen per active peer for challenge responses.
@@ -184,6 +184,16 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ
}
ix.StreamMU.RUnlock()
// AfterHeartbeat updates srec.Record asynchronously — it may not have run yet.
// Use rawRecord (the fresh signed PeerRecord embedded in the heartbeat) directly
// so referencedNodes always gets the current Name/DID regardless of timing.
if remotePeerRecord.Name == "" && len(rawRecord) > 0 {
var fresh PeerRecord
if json.Unmarshal(rawRecord, &fresh) == nil {
remotePeerRecord = fresh
}
}
// Update referent designation: node marks its best-scored indexer with Referent=true.
ix.updateReferent(remotePeer, remotePeerRecord, referent)

View File

@@ -6,10 +6,10 @@ import (
"fmt"
"oc-discovery/daemons/node/common"
"oc-discovery/daemons/node/stream"
"slices"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
@@ -25,81 +25,17 @@ type executionConsidersPayload struct {
func ListenNATS(n *Node) {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
/*tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) {
if resp.FromApp == config.GetAppName() {
return
}
if res, err := resources.ToResource(resp.Datatype.EnumIndex(), resp.Payload); err == nil {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
p := access.LoadOne(res.GetCreatorID())
realP := p.ToPeer()
if realP == nil {
return
} else if realP.Relation == peer.SELF {
pubKey, err := common.PubKeyFromString(realP.PublicKey) // extract pubkey from pubkey str
if err != nil {
return
}
ok, _ := pubKey.Verify(resp.Payload, res.GetSignature())
if b, err := json.Marshal(stream.Verify{
IsVerified: ok,
}); err == nil {
tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
Method: int(tools.VERIFY_RESOURCE),
Payload: b,
})
}
} else if realP.Relation != peer.BLACKLIST {
n.StreamService.PublishVerifyResources(&resp.Datatype, resp.User, realP.PeerID, resp.Payload)
}
}
},*/
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
if resp.FromApp == config.GetAppName() && resp.Datatype != tools.PEER && resp.Datatype != tools.WORKFLOW {
return
}
logger := oclib.GetLogger()
m := map[string]interface{}{}
err := json.Unmarshal(resp.Payload, &m)
if err != nil {
logger.Err(err)
return
}
p := &peer.Peer{}
p = p.Deserialize(m, p).(*peer.Peer)
ad, err := pp.AddrInfoFromString(p.StreamAddress)
if err != nil {
return
}
// Non-partner: close any existing streams for this peer.
if p.Relation != peer.PARTNER {
n.StreamService.Mu.Lock()
defer n.StreamService.Mu.Unlock()
ps := common.ProtocolStream{}
for proto, s := range n.StreamService.Streams {
m := map[pp.ID]*common.Stream{}
for k := range s {
if ad.ID != k {
m[k] = s[k]
} else {
s[k].Stream.Close()
}
}
ps[proto] = m
}
n.StreamService.Streams = ps
}
},
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
fmt.Println("PROPALGATION")
if resp.FromApp == config.GetAppName() {
return
}
p, err := oclib.GetMySelf()
if err != nil || p == nil || p.PeerID != n.PeerID.String() {
return
}
var propalgation tools.PropalgationMessage
err := json.Unmarshal(resp.Payload, &propalgation)
err = json.Unmarshal(resp.Payload, &propalgation)
var dt *tools.DataType
if propalgation.DataType > 0 {
dtt := tools.DataType(propalgation.DataType)
@@ -114,30 +50,40 @@ func ListenNATS(n *Node) {
if propalgation.Action == tools.PB_MINIO_CONFIG {
proto = stream.ProtocolMinioConfigResource
}
if err := json.Unmarshal(resp.Payload, &m); err == nil {
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
peers, _ := n.GetPeerRecord(context.Background(), m.PeerID)
for _, p := range peers {
n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups,
p.PeerID, proto, resp.Payload)
p.PeerID, proto, propalgation.Payload)
}
}
case tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE:
fmt.Println(n.StreamService.ToPartnerPublishEvent(
context.Background(),
propalgation.Action,
dt, resp.User, resp.Groups,
propalgation.Payload,
))
if slices.Contains([]tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE}, resp.Datatype) {
m := map[string]interface{}{}
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
if m["peer_id"] != nil {
n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups,
fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolCreateResource, propalgation.Payload)
}
}
} else {
fmt.Println(n.StreamService.ToPartnerPublishEvent(
context.Background(),
propalgation.Action,
dt, resp.User, resp.Groups,
propalgation.Payload,
))
}
case tools.PB_CONSIDERS:
switch resp.Datatype {
case tools.BOOKING, tools.PURCHASE_RESOURCE, tools.WORKFLOW_EXECUTION:
var m executionConsidersPayload
if err := json.Unmarshal(resp.Payload, &m); err == nil {
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
for _, p := range m.PeerIDs {
peers, _ := n.GetPeerRecord(context.Background(), p)
for _, pp := range peers {
n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups,
pp.PeerID, stream.ProtocolConsidersResource, resp.Payload)
pp.PeerID, stream.ProtocolConsidersResource, propalgation.Payload)
}
}
}
@@ -156,10 +102,10 @@ func ListenNATS(n *Node) {
}
case tools.PB_PLANNER:
m := map[string]interface{}{}
if err := json.Unmarshal(resp.Payload, &m); err == nil {
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
b := []byte{}
if len(m) > 1 {
b = resp.Payload
b = propalgation.Payload
}
if m["peer_id"] == nil { // send to every active stream
n.StreamService.Mu.Lock()
@@ -168,10 +114,10 @@ func ListenNATS(n *Node) {
n.StreamService.PublishCommon(nil, resp.User, resp.Groups, pid.String(), stream.ProtocolSendPlanner, b)
}
}
n.StreamService.Mu.Unlock()
} else {
n.StreamService.PublishCommon(nil, resp.User, resp.Groups, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b)
}
n.StreamService.Mu.Unlock()
}
case tools.PB_CLOSE_PLANNER:
m := map[string]interface{}{}

View File

@@ -129,6 +129,37 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
if node.StreamService, err = stream.InitStream(context.Background(), node.Host, node.PeerID, 1000, node); err != nil {
panic(err)
}
node.StreamService.IsPeerKnown = func(pid pp.ID) bool {
// 1. Local DB: known peer (handles blacklist).
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
results := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}},
},
}, pid.String(), false)
for _, item := range results.Data {
p, ok := item.(*peer.Peer)
if !ok || p.PeerID != pid.String() {
continue
}
return p.Relation != peer.BLACKLIST
}
// 2. Ask a connected indexer → DHT lookup by peer_id.
for _, addr := range common.Indexers.GetAddrs() {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
s, err := h.NewStream(ctx, addr.Info.ID, common.ProtocolGet)
cancel()
if err != nil {
continue
}
json.NewEncoder(s).Encode(indexer.GetValue{PeerID: pid.String()})
var resp indexer.GetResponse
json.NewDecoder(s).Decode(&resp)
s.Reset()
return resp.Found
}
return false
}
if node.PubSubService, err = pubsub.InitPubSub(context.Background(), node.Host, node.PS, node, node.StreamService); err != nil {
panic(err)
@@ -144,7 +175,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
}
}
logger.Info().Msg("subscribe to decentralized search flow...")
node.SubscribeToSearch(node.PS, &f)
go node.SubscribeToSearch(node.PS, &f)
logger.Info().Msg("connect to NATS")
go ListenNATS(node)
logger.Info().Msg("Node is actually running.")
@@ -201,7 +232,6 @@ func (d *Node) publishPeerRecord(
// Results are pushed to onResult as they arrive; the function returns when
// the stream closes (idle timeout, explicit cancel, or indexer unreachable).
func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.SearchHit)) {
fmt.Println("SearchPeerRecord", needle)
logger := oclib.GetLogger()
idleTimeout := common.SearchIdleTimeout()
@@ -240,6 +270,7 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea
<-ctx.Done()
s.SetReadDeadline(time.Now())
}()
seen := map[string]struct{}{}
dec := json.NewDecoder(s)
for {
var result common.SearchPeerResult
@@ -251,6 +282,14 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea
}
d.peerSearches.ResetIdle(searchKey)
for _, hit := range result.Records {
key := hit.PeerID
if key == "" {
key = hit.DID
}
if _, already := seen[key]; already {
continue
}
seen[key] = struct{}{}
onResult(hit)
}
}

View File

@@ -45,17 +45,17 @@ func (ps *StreamService) handleEvent(protocol string, evt *common.Event) error {
}
}
if protocol == ProtocolConsidersResource {
if err := ps.pass(evt, tools.PB_CONSIDERS); err != nil {
if err := ps.pass(evt, tools.CONSIDERS_EVENT); err != nil {
return err
}
}
if protocol == ProtocolAdmiraltyConfigResource {
if err := ps.pass(evt, tools.PB_ADMIRALTY_CONFIG); err != nil {
if err := ps.pass(evt, tools.ADMIRALTY_CONFIG_EVENT); err != nil {
return err
}
}
if protocol == ProtocolMinioConfigResource {
if err := ps.pass(evt, tools.PB_MINIO_CONFIG); err != nil {
if err := ps.pass(evt, tools.MINIO_CONFIG_EVENT); err != nil {
return err
}
}
@@ -86,6 +86,7 @@ func (abs *StreamService) verifyResponse(event *common.Event) error { //
}
func (abs *StreamService) sendPlanner(event *common.Event) error { //
fmt.Println("sendPlanner", len(event.Payload))
if len(event.Payload) == 0 {
if plan, err := planner.GenerateShallow(&tools.APIRequest{Admin: true}); err == nil {
if b, err := json.Marshal(plan); err == nil {
@@ -133,19 +134,13 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { //
return nil
}
func (abs *StreamService) pass(event *common.Event, action tools.PubSubAction) error { //
if b, err := json.Marshal(&tools.PropalgationMessage{
Action: action,
DataType: int(event.DataType),
func (abs *StreamService) pass(event *common.Event, method tools.NATSMethod) error { //
go tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.DataType(event.DataType),
Method: int(method),
Payload: event.Payload,
}); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.DataType(event.DataType),
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
})
return nil
}
@@ -166,8 +161,6 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri
}, evt.From, false)
if len(peers.Data) > 0 {
p := peers.Data[0].(*peer.Peer)
fmt.Println(evt.From, p.GetID(), peers.Data)
ps.SendResponse(p, evt, fmt.Sprintf("%v", search))
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID
ps.SendResponse(p[0], evt, fmt.Sprintf("%v", search))
@@ -182,7 +175,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri
})
}
case ProtocolCreateResource, ProtocolUpdateResource:
fmt.Println("RECEIVED Protocol.Update")
fmt.Println("RECEIVED Protocol.Update", string(evt.Payload))
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.DataType(evt.DataType),
@@ -218,13 +211,10 @@ func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event, search
} else {
for _, dt := range dts {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil)
peerID := p.GetID()
searched := access.Search(abs.FilterPeer(self.GetID(), event.Groups, search), "", false)
fmt.Println("SEND SEARCH_EVENT", self.GetID(), dt, len(searched.Data), peerID)
for _, ss := range searched.Data {
if j, err := json.Marshal(ss); err == nil {
_, err := abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j)
fmt.Println("Publish ERR", err)
abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j)
}
}
}

View File

@@ -26,7 +26,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, groups
for _, pes := range p.Data {
for _, proto := range protos {
if _, err := ps.PublishCommon(dt, user, groups, pes.(*peer.Peer).PeerID, proto, resource); err != nil {
return err
continue
}
}
}
@@ -57,7 +57,6 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, groups [
if err != nil {
return nil, err
}
fmt.Println("WRITE")
return ps.write(toPeerID, ad, dt, user, resource, proto)
}
return nil, errors.New("peer unvalid " + toPeerID)
@@ -131,21 +130,17 @@ func (s *StreamService) write(
return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
}
if self, err := oclib.GetMySelf(); err != nil {
return nil, err
} else {
stream := s.Streams[proto][peerID.ID]
evt := common.NewEvent(string(proto), self.PeerID, dt, user, payload)
fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp)
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
stream.Stream.Close()
logger.Err(err)
return nil, err
}
if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse {
go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true})
}
return stream, nil
}
stream := s.Streams[proto][peerID.ID]
evt := common.NewEvent(string(proto), s.Host.ID().String(), dt, user, payload)
fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp)
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
stream.Stream.Close()
logger.Err(err)
return nil, err
}
if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse {
go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true})
}
return stream, nil
}

View File

@@ -51,13 +51,16 @@ var protocolsPartners = map[protocol.ID]*common.ProtocolInfo{
}
type StreamService struct {
Key pp.ID
Host host.Host
Node common.DiscoveryPeer
Streams common.ProtocolStream
maxNodesConn int
Mu sync.RWMutex
Key pp.ID
Host host.Host
Node common.DiscoveryPeer
Streams common.ProtocolStream
maxNodesConn int
Mu sync.RWMutex
ResourceSearches *common.SearchTracker
// IsPeerKnown, when set, is called at stream open for every inbound protocol.
// Return false to reset the stream immediately. Left nil until wired by the node.
IsPeerKnown func(pid pp.ID) bool
}
func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) {
@@ -71,7 +74,7 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
ResourceSearches: common.NewSearchTracker(),
}
for proto := range protocols {
service.Host.SetStreamHandler(proto, service.HandleResponse)
service.Host.SetStreamHandler(proto, service.gate(service.HandleResponse))
}
logger.Info().Msg("connect to partners...")
service.connectToPartners() // we set up a stream
@@ -79,6 +82,21 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
return service, nil
}
// gate wraps a stream handler with IsPeerKnown validation.
// If the peer is unknown the entire connection is closed and the handler is not called.
// IsPeerKnown is read at stream-open time so it works even when set after InitStream.
func (s *StreamService) gate(h func(network.Stream)) func(network.Stream) {
return func(stream network.Stream) {
if s.IsPeerKnown != nil && !s.IsPeerKnown(stream.Conn().RemotePeer()) {
logger := oclib.GetLogger()
logger.Warn().Str("peer", stream.Conn().RemotePeer().String()).Msg("[stream] unknown peer, closing connection")
stream.Conn().Close()
return
}
h(stream)
}
}
func (s *StreamService) HandleResponse(stream network.Stream) {
s.Mu.Lock()
defer s.Mu.Unlock()
@@ -119,7 +137,7 @@ func (s *StreamService) connectToPartners() error {
go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()], ss.Conn().RemotePeer(), proto, info)
}
logger.Info().Msg("SetStreamHandler " + string(proto))
s.Host.SetStreamHandler(proto, f)
s.Host.SetStreamHandler(proto, s.gate(f))
}
return nil
}

View File

@@ -4,5 +4,6 @@
"NATS_URL": "nats://nats:4222",
"NODE_MODE": "node",
"NODE_ENDPOINT_PORT": 4003,
"NAME": "opencloud-demo-1",
"INDEXER_ADDRESSES": "/ip4/172.40.0.2/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u"
}

View File

@@ -4,5 +4,6 @@
"NATS_URL": "nats://nats:4222",
"NODE_MODE": "node",
"NODE_ENDPOINT_PORT": 4004,
"NAME": "opencloud-demo-2",
"INDEXER_ADDRESSES": "/ip4/172.40.0.1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
}

2
go.mod
View File

@@ -3,7 +3,7 @@ module oc-discovery
go 1.25.0
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2
cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406
github.com/ipfs/go-cid v0.6.0
github.com/libp2p/go-libp2p v0.47.0
github.com/libp2p/go-libp2p-record v0.3.1

2
go.sum
View File

@@ -6,6 +6,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 h1:DuB6SDThFVJVQ0iI0pZnBqtCE0uW+SNI7R7ndKixu2k=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406 h1:FN1EtRWn228JprAbnY5K863Fzj+SzMqQtKRtwvECbLw=
cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=