Compare commits
4 Commits
edcfecd24b
...
feature/no
| Author | SHA1 | Date | |
|---|---|---|---|
| 46dee0a6cb | |||
| 2108df5283 | |||
| 380de4c80b | |||
| 83285c2ab5 |
@@ -42,7 +42,10 @@ type LongLivedStreamRecordedService[T interface{}] struct {
|
||||
// remotePeer is the peer that sent the heartbeat (used for offload routing).
|
||||
// need is how many more indexers the node wants (from hb.Need).
|
||||
// referent is true when the node designated this indexer as its search referent.
|
||||
BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *HeartbeatResponse
|
||||
// rawRecord is the fresh signed PeerRecord embedded in the heartbeat (hb.Record),
|
||||
// passed directly so the handler does not race with AfterHeartbeat goroutine
|
||||
// updating StreamRecord.Record.
|
||||
BuildHeartbeatResponse func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *HeartbeatResponse
|
||||
}
|
||||
|
||||
func (ix *LongLivedStreamRecordedService[T]) MaxNodesConn() int {
|
||||
@@ -240,11 +243,11 @@ func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) {
|
||||
ix.StreamMU.RUnlock()
|
||||
}
|
||||
if ix.AfterHeartbeat != nil && hb.DID != "" {
|
||||
ix.AfterHeartbeat(hb)
|
||||
go ix.AfterHeartbeat(hb)
|
||||
}
|
||||
// Send response back to the node (bidirectional heartbeat).
|
||||
if ix.BuildHeartbeatResponse != nil {
|
||||
if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent); resp != nil {
|
||||
if resp := ix.BuildHeartbeatResponse(s.Conn().RemotePeer(), hb.Need, hb.Challenges, hb.ChallengeDID, hb.Referent, hb.Record); resp != nil {
|
||||
s.SetWriteDeadline(time.Now().Add(3 * time.Second))
|
||||
json.NewEncoder(s).Encode(resp)
|
||||
s.SetWriteDeadline(time.Time{})
|
||||
|
||||
@@ -138,8 +138,7 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad
|
||||
logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String())
|
||||
s, err := h.NewStream(ctx, p.ID, proto)
|
||||
if err != nil {
|
||||
fmt.Println(s, err, p.ID)
|
||||
logger.Err(err)
|
||||
logger.Err(err).Msg(err.Error())
|
||||
return nil, 0, err
|
||||
}
|
||||
pss = &Stream{
|
||||
|
||||
@@ -1,16 +1,6 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"oc-discovery/daemons/node/common"
|
||||
"oc-discovery/daemons/node/indexer"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"github.com/libp2p/go-libp2p/core/control"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
@@ -18,13 +8,10 @@ import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// OCConnectionGater enforces two rules on every inbound connection:
|
||||
// 1. If the peer is known locally and blacklisted → reject.
|
||||
// 2. If the peer is unknown locally → ask indexers one by one whether it
|
||||
// exists in the DHT. Accept as soon as one confirms it; reject if none do
|
||||
// (or if no indexers are reachable yet, allow optimistically).
|
||||
//
|
||||
// Outbound connections are always allowed — we chose to dial them.
|
||||
// OCConnectionGater allows all connections unconditionally.
|
||||
// Peer validation (local DB + DHT by peer_id) is enforced at the stream level
|
||||
// in each handler, so ProtocolHeartbeat and ProtocolPublish — through which a
|
||||
// node first registers itself — are never blocked.
|
||||
type OCConnectionGater struct {
|
||||
host host.Host
|
||||
}
|
||||
@@ -33,105 +20,12 @@ func newOCConnectionGater(h host.Host) *OCConnectionGater {
|
||||
return &OCConnectionGater{host: h}
|
||||
}
|
||||
|
||||
// InterceptPeerDial — allow all outbound dials.
|
||||
func (g *OCConnectionGater) InterceptPeerDial(_ pp.ID) bool { return true }
|
||||
|
||||
// InterceptAddrDial — allow all outbound dials.
|
||||
func (g *OCConnectionGater) InterceptAddrDial(_ pp.ID, _ ma.Multiaddr) bool { return true }
|
||||
|
||||
// InterceptAccept — allow at transport level (PeerID not yet known).
|
||||
func (g *OCConnectionGater) InterceptAccept(_ network.ConnMultiaddrs) bool { return true }
|
||||
|
||||
// InterceptUpgraded — final gate; always allow (decisions already made in InterceptSecured).
|
||||
func (g *OCConnectionGater) InterceptPeerDial(_ pp.ID) bool { return true }
|
||||
func (g *OCConnectionGater) InterceptAddrDial(_ pp.ID, _ ma.Multiaddr) bool { return true }
|
||||
func (g *OCConnectionGater) InterceptAccept(_ network.ConnMultiaddrs) bool { return true }
|
||||
func (g *OCConnectionGater) InterceptSecured(_ network.Direction, _ pp.ID, _ network.ConnMultiaddrs) bool {
|
||||
return true
|
||||
}
|
||||
func (g *OCConnectionGater) InterceptUpgraded(_ network.Conn) (bool, control.DisconnectReason) {
|
||||
return true, 0
|
||||
}
|
||||
|
||||
// InterceptSecured is called after the cryptographic handshake — PeerID is now known.
|
||||
// Only inbound connections are verified; outbound are trusted.
|
||||
func (g *OCConnectionGater) InterceptSecured(dir network.Direction, pid pp.ID, _ network.ConnMultiaddrs) bool {
|
||||
if dir == network.DirOutbound {
|
||||
return true
|
||||
}
|
||||
logger := oclib.GetLogger()
|
||||
|
||||
// 1. Local DB lookup by PeerID.
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
results := access.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{ // search by name if no filters are provided
|
||||
"peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}},
|
||||
},
|
||||
}, pid.String(), false)
|
||||
for _, item := range results.Data {
|
||||
p, ok := item.(*peer.Peer)
|
||||
if !ok || p.PeerID != pid.String() {
|
||||
continue
|
||||
}
|
||||
if p.Relation == peer.BLACKLIST {
|
||||
logger.Warn().Str("peer", pid.String()).Msg("[gater] rejected blacklisted peer")
|
||||
return false
|
||||
}
|
||||
// Known, not blacklisted.
|
||||
return true
|
||||
}
|
||||
|
||||
// 2. Unknown locally — verify via indexers.
|
||||
indexers := common.Indexers.GetAddrs()
|
||||
|
||||
if len(indexers) == 0 {
|
||||
// No indexers reachable yet — allow optimistically (bootstrap phase).
|
||||
logger.Warn().Str("peer", pid.String()).Msg("[gater] no indexers available, allowing unverified inbound")
|
||||
return true
|
||||
}
|
||||
|
||||
req := indexer.GetValue{PeerID: pid.String()}
|
||||
// A single DHT GetValue already traverses the entire DHT network, so asking
|
||||
// a second indexer would yield the same result. We only fall through to the
|
||||
// next indexer if the current one is unreachable (transport error), not if
|
||||
// it returns found=false (that answer is already DHT-wide authoritative).
|
||||
for _, ai := range indexers {
|
||||
found, reachable := queryIndexerPeerExists(g.host, *ai.Info, req)
|
||||
if !reachable {
|
||||
continue // indexer down — try next
|
||||
}
|
||||
if !found {
|
||||
logger.Warn().Str("peer", pid.String()).Msg("[gater] peer not found in DHT, rejecting inbound")
|
||||
}
|
||||
return found // definitive DHT answer
|
||||
}
|
||||
|
||||
// All indexers unreachable — allow optimistically rather than blocking indefinitely.
|
||||
logger.Warn().Str("peer", pid.String()).Msg("[gater] all indexers unreachable, allowing unverified inbound")
|
||||
return true
|
||||
}
|
||||
|
||||
// queryIndexerPeerExists opens a fresh one-shot stream to ai, sends a GetValue
|
||||
// request, and returns (found, reachable).
|
||||
// reachable=false means the indexer could not be reached (transport error);
|
||||
// the caller should then try another indexer.
|
||||
// reachable=true means the indexer answered — found is the DHT-wide authoritative result.
|
||||
func queryIndexerPeerExists(h host.Host, ai pp.AddrInfo, req indexer.GetValue) (found, reachable bool) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if h.Network().Connectedness(ai.ID) != network.Connected {
|
||||
if err := h.Connect(ctx, ai); err != nil {
|
||||
return false, false
|
||||
}
|
||||
}
|
||||
s, err := h.NewStream(ctx, ai.ID, common.ProtocolGet)
|
||||
if err != nil {
|
||||
return false, false
|
||||
}
|
||||
defer s.Close()
|
||||
s.SetDeadline(time.Now().Add(3 * time.Second))
|
||||
|
||||
if err := json.NewEncoder(s).Encode(req); err != nil {
|
||||
return false, false
|
||||
}
|
||||
var resp indexer.GetResponse
|
||||
if err := json.NewDecoder(s).Decode(&resp); err != nil {
|
||||
return false, false
|
||||
}
|
||||
return resp.Found, true
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ type nodeBehavior struct {
|
||||
}
|
||||
|
||||
func (nb *nodeBehavior) isBanned() bool {
|
||||
return time.Now().Before(nb.bannedUntil)
|
||||
return time.Now().UTC().Before(nb.bannedUntil)
|
||||
}
|
||||
|
||||
func (nb *nodeBehavior) strike(n int) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
pp "cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
@@ -108,6 +109,49 @@ func (ix *IndexerService) genPIDKey(peerID string) string {
|
||||
return "/pid/" + peerID
|
||||
}
|
||||
|
||||
// isPeerKnown is the stream-level gate: returns true if pid is allowed.
|
||||
// Check order (fast → slow):
|
||||
// 1. In-memory stream records — currently heartbeating to this indexer.
|
||||
// 2. Local DB by peer_id — known peer, blacklist enforced here.
|
||||
// 3. DHT /pid/{peerID} → /node/{DID} — registered on any indexer.
|
||||
//
|
||||
// ProtocolHeartbeat and ProtocolPublish handlers do NOT call this — they are
|
||||
// the streams through which a node first makes itself known.
|
||||
func (ix *IndexerService) isPeerKnown(pid lpp.ID) bool {
|
||||
// 1. Fast path: active heartbeat session.
|
||||
ix.StreamMU.RLock()
|
||||
_, active := ix.StreamRecords[common.ProtocolHeartbeat][pid]
|
||||
ix.StreamMU.RUnlock()
|
||||
if active {
|
||||
return true
|
||||
}
|
||||
// 2. Local DB: known peer (handles blacklist).
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
results := access.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}},
|
||||
},
|
||||
}, pid.String(), false)
|
||||
for _, item := range results.Data {
|
||||
p, ok := item.(*pp.Peer)
|
||||
if !ok || p.PeerID != pid.String() {
|
||||
continue
|
||||
}
|
||||
return p.Relation != pp.BLACKLIST
|
||||
}
|
||||
// 3. DHT lookup by peer_id.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
did, err := ix.DHT.GetValue(ctx, ix.genPIDKey(pid.String()))
|
||||
cancel()
|
||||
if err != nil || len(did) == 0 {
|
||||
return false
|
||||
}
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
_, err = ix.DHT.GetValue(ctx2, ix.genKey(string(did)))
|
||||
cancel2()
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (ix *IndexerService) initNodeHandler() {
|
||||
logger := oclib.GetLogger()
|
||||
logger.Info().Msg("Init Node Handler")
|
||||
@@ -144,21 +188,32 @@ func (ix *IndexerService) initNodeHandler() {
|
||||
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: heartbeat record signature invalid")
|
||||
return
|
||||
}
|
||||
// Keep StreamRecord.Record in sync so BuildHeartbeatResponse always
|
||||
// sees a populated PeerRecord (Name, DID, etc.) regardless of whether
|
||||
// handleNodePublish ran before or after the heartbeat stream was opened.
|
||||
if pid, err := lpp.Decode(rec.PeerID); err == nil {
|
||||
ix.StreamMU.Lock()
|
||||
if srec, ok := ix.StreamRecords[common.ProtocolHeartbeat][pid]; ok {
|
||||
srec.Record = rec
|
||||
}
|
||||
ix.StreamMU.Unlock()
|
||||
}
|
||||
data, err := json.Marshal(rec)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
logger.Info().Msg("REFRESH PutValue " + ix.genKey(rec.DID))
|
||||
if err := ix.DHT.PutValue(ctx, ix.genKey(rec.DID), data); err != nil {
|
||||
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh failed")
|
||||
cancel()
|
||||
return
|
||||
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh /node/ failed")
|
||||
}
|
||||
cancel()
|
||||
// /pid/ is written unconditionally — the gater queries by PeerID and this
|
||||
// index must stay fresh regardless of whether the /node/ write succeeded.
|
||||
if rec.PeerID != "" {
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID))
|
||||
if err := ix.DHT.PutValue(ctx2, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil {
|
||||
logger.Warn().Err(err).Str("pid", rec.PeerID).Msg("indexer: DHT refresh /pid/ failed")
|
||||
}
|
||||
cancel2()
|
||||
}
|
||||
}
|
||||
@@ -173,6 +228,12 @@ func (ix *IndexerService) initNodeHandler() {
|
||||
// Returns a random sample of indexers from the local DHT cache.
|
||||
func (ix *IndexerService) handleCandidateRequest(s network.Stream) {
|
||||
defer s.Close()
|
||||
if !ix.isPeerKnown(s.Conn().RemotePeer()) {
|
||||
logger := oclib.GetLogger()
|
||||
logger.Warn().Str("peer", s.Conn().RemotePeer().String()).Msg("[candidates] unknown peer, rejecting stream")
|
||||
s.Reset()
|
||||
return
|
||||
}
|
||||
s.SetDeadline(time.Now().Add(5 * time.Second))
|
||||
var req common.IndexerCandidatesRequest
|
||||
if err := json.NewDecoder(s).Decode(&req); err != nil {
|
||||
@@ -280,6 +341,11 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) {
|
||||
defer s.Close()
|
||||
logger := oclib.GetLogger()
|
||||
remotePeer := s.Conn().RemotePeer()
|
||||
if !ix.isPeerKnown(remotePeer) {
|
||||
logger.Warn().Str("peer", remotePeer.String()).Msg("[get] unknown peer, rejecting stream")
|
||||
s.Reset()
|
||||
return
|
||||
}
|
||||
if err := ix.behavior.RecordGet(remotePeer); err != nil {
|
||||
logger.Warn().Err(err).Str("peer", remotePeer.String()).Msg("get refused")
|
||||
s.Reset()
|
||||
|
||||
@@ -74,6 +74,11 @@ func (ix *IndexerService) handleSearchPeer(s network.Stream) {
|
||||
logger := oclib.GetLogger()
|
||||
defer s.Reset()
|
||||
|
||||
if !ix.isPeerKnown(s.Conn().RemotePeer()) {
|
||||
logger.Warn().Str("peer", s.Conn().RemotePeer().String()).Msg("[search] unknown peer, rejecting stream")
|
||||
return
|
||||
}
|
||||
|
||||
var req common.SearchPeerRequest
|
||||
if err := json.NewDecoder(s).Decode(&req); err != nil || req.QueryID == "" {
|
||||
return
|
||||
|
||||
@@ -162,7 +162,7 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ
|
||||
|
||||
// Build and send a HeartbeatResponse after each received node heartbeat.
|
||||
// Raw metrics only — no pre-cooked score. Node computes the score itself.
|
||||
ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool) *common.HeartbeatResponse {
|
||||
ix.BuildHeartbeatResponse = func(remotePeer pp.ID, need int, challenges []string, challengeDID string, referent bool, rawRecord json.RawMessage) *common.HeartbeatResponse {
|
||||
ix.StreamMU.RLock()
|
||||
peerCount := len(ix.StreamRecords[common.ProtocolHeartbeat])
|
||||
// Collect lastSeen per active peer for challenge responses.
|
||||
@@ -184,6 +184,16 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ
|
||||
}
|
||||
ix.StreamMU.RUnlock()
|
||||
|
||||
// AfterHeartbeat updates srec.Record asynchronously — it may not have run yet.
|
||||
// Use rawRecord (the fresh signed PeerRecord embedded in the heartbeat) directly
|
||||
// so referencedNodes always gets the current Name/DID regardless of timing.
|
||||
if remotePeerRecord.Name == "" && len(rawRecord) > 0 {
|
||||
var fresh PeerRecord
|
||||
if json.Unmarshal(rawRecord, &fresh) == nil {
|
||||
remotePeerRecord = fresh
|
||||
}
|
||||
}
|
||||
|
||||
// Update referent designation: node marks its best-scored indexer with Referent=true.
|
||||
ix.updateReferent(remotePeer, remotePeerRecord, referent)
|
||||
|
||||
|
||||
@@ -6,10 +6,10 @@ import (
|
||||
"fmt"
|
||||
"oc-discovery/daemons/node/common"
|
||||
"oc-discovery/daemons/node/stream"
|
||||
"slices"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/config"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
pp "github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
@@ -25,81 +25,17 @@ type executionConsidersPayload struct {
|
||||
|
||||
func ListenNATS(n *Node) {
|
||||
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
||||
/*tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) {
|
||||
if resp.FromApp == config.GetAppName() {
|
||||
return
|
||||
}
|
||||
if res, err := resources.ToResource(resp.Datatype.EnumIndex(), resp.Payload); err == nil {
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
p := access.LoadOne(res.GetCreatorID())
|
||||
realP := p.ToPeer()
|
||||
if realP == nil {
|
||||
return
|
||||
} else if realP.Relation == peer.SELF {
|
||||
pubKey, err := common.PubKeyFromString(realP.PublicKey) // extract pubkey from pubkey str
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ok, _ := pubKey.Verify(resp.Payload, res.GetSignature())
|
||||
if b, err := json.Marshal(stream.Verify{
|
||||
IsVerified: ok,
|
||||
}); err == nil {
|
||||
tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Method: int(tools.VERIFY_RESOURCE),
|
||||
Payload: b,
|
||||
})
|
||||
}
|
||||
} else if realP.Relation != peer.BLACKLIST {
|
||||
n.StreamService.PublishVerifyResources(&resp.Datatype, resp.User, realP.PeerID, resp.Payload)
|
||||
}
|
||||
}
|
||||
},*/
|
||||
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
|
||||
if resp.FromApp == config.GetAppName() && resp.Datatype != tools.PEER && resp.Datatype != tools.WORKFLOW {
|
||||
return
|
||||
}
|
||||
logger := oclib.GetLogger()
|
||||
m := map[string]interface{}{}
|
||||
err := json.Unmarshal(resp.Payload, &m)
|
||||
if err != nil {
|
||||
logger.Err(err)
|
||||
return
|
||||
}
|
||||
p := &peer.Peer{}
|
||||
p = p.Deserialize(m, p).(*peer.Peer)
|
||||
|
||||
ad, err := pp.AddrInfoFromString(p.StreamAddress)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Non-partner: close any existing streams for this peer.
|
||||
if p.Relation != peer.PARTNER {
|
||||
n.StreamService.Mu.Lock()
|
||||
defer n.StreamService.Mu.Unlock()
|
||||
ps := common.ProtocolStream{}
|
||||
for proto, s := range n.StreamService.Streams {
|
||||
m := map[pp.ID]*common.Stream{}
|
||||
for k := range s {
|
||||
if ad.ID != k {
|
||||
m[k] = s[k]
|
||||
} else {
|
||||
s[k].Stream.Close()
|
||||
}
|
||||
}
|
||||
ps[proto] = m
|
||||
}
|
||||
n.StreamService.Streams = ps
|
||||
}
|
||||
|
||||
},
|
||||
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
||||
fmt.Println("PROPALGATION")
|
||||
if resp.FromApp == config.GetAppName() {
|
||||
return
|
||||
}
|
||||
p, err := oclib.GetMySelf()
|
||||
if err != nil || p == nil || p.PeerID != n.PeerID.String() {
|
||||
return
|
||||
}
|
||||
var propalgation tools.PropalgationMessage
|
||||
err := json.Unmarshal(resp.Payload, &propalgation)
|
||||
err = json.Unmarshal(resp.Payload, &propalgation)
|
||||
var dt *tools.DataType
|
||||
if propalgation.DataType > 0 {
|
||||
dtt := tools.DataType(propalgation.DataType)
|
||||
@@ -114,30 +50,40 @@ func ListenNATS(n *Node) {
|
||||
if propalgation.Action == tools.PB_MINIO_CONFIG {
|
||||
proto = stream.ProtocolMinioConfigResource
|
||||
}
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
|
||||
peers, _ := n.GetPeerRecord(context.Background(), m.PeerID)
|
||||
for _, p := range peers {
|
||||
n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups,
|
||||
p.PeerID, proto, resp.Payload)
|
||||
p.PeerID, proto, propalgation.Payload)
|
||||
}
|
||||
}
|
||||
case tools.PB_CREATE, tools.PB_UPDATE, tools.PB_DELETE:
|
||||
fmt.Println(n.StreamService.ToPartnerPublishEvent(
|
||||
context.Background(),
|
||||
propalgation.Action,
|
||||
dt, resp.User, resp.Groups,
|
||||
propalgation.Payload,
|
||||
))
|
||||
if slices.Contains([]tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE}, resp.Datatype) {
|
||||
m := map[string]interface{}{}
|
||||
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
|
||||
if m["peer_id"] != nil {
|
||||
n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups,
|
||||
fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolCreateResource, propalgation.Payload)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fmt.Println(n.StreamService.ToPartnerPublishEvent(
|
||||
context.Background(),
|
||||
propalgation.Action,
|
||||
dt, resp.User, resp.Groups,
|
||||
propalgation.Payload,
|
||||
))
|
||||
}
|
||||
case tools.PB_CONSIDERS:
|
||||
switch resp.Datatype {
|
||||
case tools.BOOKING, tools.PURCHASE_RESOURCE, tools.WORKFLOW_EXECUTION:
|
||||
var m executionConsidersPayload
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
|
||||
for _, p := range m.PeerIDs {
|
||||
peers, _ := n.GetPeerRecord(context.Background(), p)
|
||||
for _, pp := range peers {
|
||||
n.StreamService.PublishCommon(&resp.Datatype, resp.User, resp.Groups,
|
||||
pp.PeerID, stream.ProtocolConsidersResource, resp.Payload)
|
||||
pp.PeerID, stream.ProtocolConsidersResource, propalgation.Payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -156,10 +102,10 @@ func ListenNATS(n *Node) {
|
||||
}
|
||||
case tools.PB_PLANNER:
|
||||
m := map[string]interface{}{}
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
|
||||
b := []byte{}
|
||||
if len(m) > 1 {
|
||||
b = resp.Payload
|
||||
b = propalgation.Payload
|
||||
}
|
||||
if m["peer_id"] == nil { // send to every active stream
|
||||
n.StreamService.Mu.Lock()
|
||||
@@ -168,10 +114,10 @@ func ListenNATS(n *Node) {
|
||||
n.StreamService.PublishCommon(nil, resp.User, resp.Groups, pid.String(), stream.ProtocolSendPlanner, b)
|
||||
}
|
||||
}
|
||||
n.StreamService.Mu.Unlock()
|
||||
} else {
|
||||
n.StreamService.PublishCommon(nil, resp.User, resp.Groups, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b)
|
||||
}
|
||||
n.StreamService.Mu.Unlock()
|
||||
}
|
||||
case tools.PB_CLOSE_PLANNER:
|
||||
m := map[string]interface{}{}
|
||||
|
||||
@@ -129,6 +129,37 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
||||
if node.StreamService, err = stream.InitStream(context.Background(), node.Host, node.PeerID, 1000, node); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
node.StreamService.IsPeerKnown = func(pid pp.ID) bool {
|
||||
// 1. Local DB: known peer (handles blacklist).
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
results := access.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"peer_id": {{Operator: dbs.EQUAL.String(), Value: pid.String()}},
|
||||
},
|
||||
}, pid.String(), false)
|
||||
for _, item := range results.Data {
|
||||
p, ok := item.(*peer.Peer)
|
||||
if !ok || p.PeerID != pid.String() {
|
||||
continue
|
||||
}
|
||||
return p.Relation != peer.BLACKLIST
|
||||
}
|
||||
// 2. Ask a connected indexer → DHT lookup by peer_id.
|
||||
for _, addr := range common.Indexers.GetAddrs() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
|
||||
s, err := h.NewStream(ctx, addr.Info.ID, common.ProtocolGet)
|
||||
cancel()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
json.NewEncoder(s).Encode(indexer.GetValue{PeerID: pid.String()})
|
||||
var resp indexer.GetResponse
|
||||
json.NewDecoder(s).Decode(&resp)
|
||||
s.Reset()
|
||||
return resp.Found
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
if node.PubSubService, err = pubsub.InitPubSub(context.Background(), node.Host, node.PS, node, node.StreamService); err != nil {
|
||||
panic(err)
|
||||
@@ -144,7 +175,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
||||
}
|
||||
}
|
||||
logger.Info().Msg("subscribe to decentralized search flow...")
|
||||
node.SubscribeToSearch(node.PS, &f)
|
||||
go node.SubscribeToSearch(node.PS, &f)
|
||||
logger.Info().Msg("connect to NATS")
|
||||
go ListenNATS(node)
|
||||
logger.Info().Msg("Node is actually running.")
|
||||
@@ -201,7 +232,6 @@ func (d *Node) publishPeerRecord(
|
||||
// Results are pushed to onResult as they arrive; the function returns when
|
||||
// the stream closes (idle timeout, explicit cancel, or indexer unreachable).
|
||||
func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.SearchHit)) {
|
||||
fmt.Println("SearchPeerRecord", needle)
|
||||
logger := oclib.GetLogger()
|
||||
|
||||
idleTimeout := common.SearchIdleTimeout()
|
||||
@@ -240,6 +270,7 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea
|
||||
<-ctx.Done()
|
||||
s.SetReadDeadline(time.Now())
|
||||
}()
|
||||
seen := map[string]struct{}{}
|
||||
dec := json.NewDecoder(s)
|
||||
for {
|
||||
var result common.SearchPeerResult
|
||||
@@ -251,6 +282,14 @@ func (d *Node) SearchPeerRecord(userKey, needle string, onResult func(common.Sea
|
||||
}
|
||||
d.peerSearches.ResetIdle(searchKey)
|
||||
for _, hit := range result.Records {
|
||||
key := hit.PeerID
|
||||
if key == "" {
|
||||
key = hit.DID
|
||||
}
|
||||
if _, already := seen[key]; already {
|
||||
continue
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
onResult(hit)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,17 +45,17 @@ func (ps *StreamService) handleEvent(protocol string, evt *common.Event) error {
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolConsidersResource {
|
||||
if err := ps.pass(evt, tools.PB_CONSIDERS); err != nil {
|
||||
if err := ps.pass(evt, tools.CONSIDERS_EVENT); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolAdmiraltyConfigResource {
|
||||
if err := ps.pass(evt, tools.PB_ADMIRALTY_CONFIG); err != nil {
|
||||
if err := ps.pass(evt, tools.ADMIRALTY_CONFIG_EVENT); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolMinioConfigResource {
|
||||
if err := ps.pass(evt, tools.PB_MINIO_CONFIG); err != nil {
|
||||
if err := ps.pass(evt, tools.MINIO_CONFIG_EVENT); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -86,6 +86,7 @@ func (abs *StreamService) verifyResponse(event *common.Event) error { //
|
||||
}
|
||||
|
||||
func (abs *StreamService) sendPlanner(event *common.Event) error { //
|
||||
fmt.Println("sendPlanner", len(event.Payload))
|
||||
if len(event.Payload) == 0 {
|
||||
if plan, err := planner.GenerateShallow(&tools.APIRequest{Admin: true}); err == nil {
|
||||
if b, err := json.Marshal(plan); err == nil {
|
||||
@@ -133,19 +134,13 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { //
|
||||
return nil
|
||||
}
|
||||
|
||||
func (abs *StreamService) pass(event *common.Event, action tools.PubSubAction) error { //
|
||||
if b, err := json.Marshal(&tools.PropalgationMessage{
|
||||
Action: action,
|
||||
DataType: int(event.DataType),
|
||||
func (abs *StreamService) pass(event *common.Event, method tools.NATSMethod) error { //
|
||||
go tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(event.DataType),
|
||||
Method: int(method),
|
||||
Payload: event.Payload,
|
||||
}); err == nil {
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(event.DataType),
|
||||
Method: int(tools.PROPALGATION_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -166,8 +161,6 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri
|
||||
}, evt.From, false)
|
||||
if len(peers.Data) > 0 {
|
||||
p := peers.Data[0].(*peer.Peer)
|
||||
fmt.Println(evt.From, p.GetID(), peers.Data)
|
||||
|
||||
ps.SendResponse(p, evt, fmt.Sprintf("%v", search))
|
||||
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID
|
||||
ps.SendResponse(p[0], evt, fmt.Sprintf("%v", search))
|
||||
@@ -182,7 +175,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri
|
||||
})
|
||||
}
|
||||
case ProtocolCreateResource, ProtocolUpdateResource:
|
||||
fmt.Println("RECEIVED Protocol.Update")
|
||||
fmt.Println("RECEIVED Protocol.Update", string(evt.Payload))
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(evt.DataType),
|
||||
@@ -218,13 +211,10 @@ func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event, search
|
||||
} else {
|
||||
for _, dt := range dts {
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil)
|
||||
peerID := p.GetID()
|
||||
searched := access.Search(abs.FilterPeer(self.GetID(), event.Groups, search), "", false)
|
||||
fmt.Println("SEND SEARCH_EVENT", self.GetID(), dt, len(searched.Data), peerID)
|
||||
for _, ss := range searched.Data {
|
||||
if j, err := json.Marshal(ss); err == nil {
|
||||
_, err := abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j)
|
||||
fmt.Println("Publish ERR", err)
|
||||
abs.PublishCommon(&dt, event.User, event.Groups, p.PeerID, ProtocolSearchResource, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, groups
|
||||
for _, pes := range p.Data {
|
||||
for _, proto := range protos {
|
||||
if _, err := ps.PublishCommon(dt, user, groups, pes.(*peer.Peer).PeerID, proto, resource); err != nil {
|
||||
return err
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -57,7 +57,6 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, groups [
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Println("WRITE")
|
||||
return ps.write(toPeerID, ad, dt, user, resource, proto)
|
||||
}
|
||||
return nil, errors.New("peer unvalid " + toPeerID)
|
||||
@@ -131,21 +130,17 @@ func (s *StreamService) write(
|
||||
return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
|
||||
|
||||
}
|
||||
if self, err := oclib.GetMySelf(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
stream := s.Streams[proto][peerID.ID]
|
||||
evt := common.NewEvent(string(proto), self.PeerID, dt, user, payload)
|
||||
fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp)
|
||||
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
|
||||
stream.Stream.Close()
|
||||
logger.Err(err)
|
||||
return nil, err
|
||||
}
|
||||
if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse {
|
||||
go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true})
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
stream := s.Streams[proto][peerID.ID]
|
||||
evt := common.NewEvent(string(proto), s.Host.ID().String(), dt, user, payload)
|
||||
fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp)
|
||||
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
|
||||
stream.Stream.Close()
|
||||
logger.Err(err)
|
||||
return nil, err
|
||||
}
|
||||
if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse {
|
||||
go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true})
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
@@ -51,13 +51,16 @@ var protocolsPartners = map[protocol.ID]*common.ProtocolInfo{
|
||||
}
|
||||
|
||||
type StreamService struct {
|
||||
Key pp.ID
|
||||
Host host.Host
|
||||
Node common.DiscoveryPeer
|
||||
Streams common.ProtocolStream
|
||||
maxNodesConn int
|
||||
Mu sync.RWMutex
|
||||
Key pp.ID
|
||||
Host host.Host
|
||||
Node common.DiscoveryPeer
|
||||
Streams common.ProtocolStream
|
||||
maxNodesConn int
|
||||
Mu sync.RWMutex
|
||||
ResourceSearches *common.SearchTracker
|
||||
// IsPeerKnown, when set, is called at stream open for every inbound protocol.
|
||||
// Return false to reset the stream immediately. Left nil until wired by the node.
|
||||
IsPeerKnown func(pid pp.ID) bool
|
||||
}
|
||||
|
||||
func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node common.DiscoveryPeer) (*StreamService, error) {
|
||||
@@ -71,7 +74,7 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
|
||||
ResourceSearches: common.NewSearchTracker(),
|
||||
}
|
||||
for proto := range protocols {
|
||||
service.Host.SetStreamHandler(proto, service.HandleResponse)
|
||||
service.Host.SetStreamHandler(proto, service.gate(service.HandleResponse))
|
||||
}
|
||||
logger.Info().Msg("connect to partners...")
|
||||
service.connectToPartners() // we set up a stream
|
||||
@@ -79,6 +82,21 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
|
||||
return service, nil
|
||||
}
|
||||
|
||||
// gate wraps a stream handler with IsPeerKnown validation.
|
||||
// If the peer is unknown the entire connection is closed and the handler is not called.
|
||||
// IsPeerKnown is read at stream-open time so it works even when set after InitStream.
|
||||
func (s *StreamService) gate(h func(network.Stream)) func(network.Stream) {
|
||||
return func(stream network.Stream) {
|
||||
if s.IsPeerKnown != nil && !s.IsPeerKnown(stream.Conn().RemotePeer()) {
|
||||
logger := oclib.GetLogger()
|
||||
logger.Warn().Str("peer", stream.Conn().RemotePeer().String()).Msg("[stream] unknown peer, closing connection")
|
||||
stream.Conn().Close()
|
||||
return
|
||||
}
|
||||
h(stream)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StreamService) HandleResponse(stream network.Stream) {
|
||||
s.Mu.Lock()
|
||||
defer s.Mu.Unlock()
|
||||
@@ -119,7 +137,7 @@ func (s *StreamService) connectToPartners() error {
|
||||
go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()], ss.Conn().RemotePeer(), proto, info)
|
||||
}
|
||||
logger.Info().Msg("SetStreamHandler " + string(proto))
|
||||
s.Host.SetStreamHandler(proto, f)
|
||||
s.Host.SetStreamHandler(proto, s.gate(f))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -4,5 +4,6 @@
|
||||
"NATS_URL": "nats://nats:4222",
|
||||
"NODE_MODE": "node",
|
||||
"NODE_ENDPOINT_PORT": 4003,
|
||||
"NAME": "opencloud-demo-1",
|
||||
"INDEXER_ADDRESSES": "/ip4/172.40.0.2/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u"
|
||||
}
|
||||
@@ -4,5 +4,6 @@
|
||||
"NATS_URL": "nats://nats:4222",
|
||||
"NODE_MODE": "node",
|
||||
"NODE_ENDPOINT_PORT": 4004,
|
||||
"NAME": "opencloud-demo-2",
|
||||
"INDEXER_ADDRESSES": "/ip4/172.40.0.1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
||||
}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module oc-discovery
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406
|
||||
github.com/ipfs/go-cid v0.6.0
|
||||
github.com/libp2p/go-libp2p v0.47.0
|
||||
github.com/libp2p/go-libp2p-record v0.3.1
|
||||
|
||||
2
go.sum
2
go.sum
@@ -6,6 +6,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 h1:DuB6SDThFVJVQ0iI0pZnBqtCE0uW+SNI7R7ndKixu2k=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406 h1:FN1EtRWn228JprAbnY5K863Fzj+SzMqQtKRtwvECbLw=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
|
||||
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
|
||||
|
||||
Reference in New Issue
Block a user