From 6d0c78946e8b7278a1be04b2e572ad867b349274 Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 24 Mar 2026 12:49:37 +0100 Subject: [PATCH] Peerless + New Argo --- models/booking/booking.go | 9 ++++ models/peer/peer.go | 51 +++++++++++++++++++ models/resources/compute.go | 4 ++ models/resources/data.go | 7 +++ models/resources/interfaces.go | 2 + models/resources/origin.go | 31 +++++++++++ models/resources/processing.go | 9 ++++ models/resources/resource.go | 30 +++++++++++ models/resources/storage.go | 4 ++ .../workflow_execution/workflow_execution.go | 1 + tools/kubernetes.go | 7 +-- tools/nats_caller.go | 7 +++ tools/peer_behavior.go | 49 ++++++++++++++++++ 13 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 models/resources/origin.go create mode 100644 tools/peer_behavior.go diff --git a/models/booking/booking.go b/models/booking/booking.go index 9e2141d..785f5d9 100644 --- a/models/booking/booking.go +++ b/models/booking/booking.go @@ -37,6 +37,15 @@ type Booking struct { // Authorization: identifies who created this draft and the Check session it belongs to. // Used to verify UPDATE and DELETE orders from remote schedulers. SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"` + + // Peerless is true when the booked resource has no destination peer + // (e.g. a public Docker Hub image). No peer confirmation or pricing + // negotiation is needed; the booking is stored locally only. + Peerless bool `json:"peerless,omitempty" bson:"peerless,omitempty"` + + // OriginRef carries the registry reference of a peerless resource + // (e.g. "docker.io/pytorch/pytorch:2.1") so schedulers can validate it. + OriginRef string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"` } func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResume { diff --git a/models/peer/peer.go b/models/peer/peer.go index 5828766..0819b2e 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -3,6 +3,7 @@ package peer import ( "fmt" "strings" + "time" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" @@ -41,6 +42,15 @@ func (m PeerRelation) EnumIndex() int { return int(m) } +// BehaviorWarning records a single misbehavior observed by a trusted service. +type BehaviorWarning struct { + At time.Time `json:"at" bson:"at"` + ReporterApp string `json:"reporter_app" bson:"reporter_app"` + Severity tools.BehaviorSeverity `json:"severity" bson:"severity"` + Reason string `json:"reason" bson:"reason"` + Evidence string `json:"evidence,omitempty" bson:"evidence,omitempty"` +} + // Peer is a struct that represents a peer type Peer struct { utils.AbstractObject @@ -56,12 +66,53 @@ type Peer struct { Relation PeerRelation `json:"relation" bson:"relation" default:"0"` ServicesState map[string]int `json:"services_state,omitempty" bson:"services_state,omitempty"` FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried + + // Trust scoring — maintained by oc-discovery from PEER_BEHAVIOR_EVENT reports. + TrustScore float64 `json:"trust_score" bson:"trust_score" default:"100"` + BlacklistReason string `json:"blacklist_reason,omitempty" bson:"blacklist_reason,omitempty"` + BehaviorWarnings []BehaviorWarning `json:"behavior_warnings,omitempty" bson:"behavior_warnings,omitempty"` } func (ao *Peer) VerifyAuth(callName string, request *tools.APIRequest) bool { return true } +// BlacklistThreshold is the trust score below which a peer is auto-blacklisted. +const BlacklistThreshold = 20.0 + +// ApplyBehaviorReport records a misbehavior, deducts the trust penalty, and +// returns true when the trust score has fallen below BlacklistThreshold so the +// caller can trigger the relation change. +func (p *Peer) ApplyBehaviorReport(r tools.PeerBehaviorReport) (shouldBlacklist bool) { + p.BehaviorWarnings = append(p.BehaviorWarnings, BehaviorWarning{ + At: r.At, + ReporterApp: r.ReporterApp, + Severity: r.Severity, + Reason: r.Reason, + Evidence: r.Evidence, + }) + if p.TrustScore == 0 { + p.TrustScore = 100 // initialise if never set + } + p.TrustScore -= r.Severity.Penalty() + if p.TrustScore < 0 { + p.TrustScore = 0 + } + if p.TrustScore <= BlacklistThreshold { + p.BlacklistReason = r.Reason + return true + } + return false +} + +// ResetTrust clears all behavior history and resets the trust score to 100. +// Must be called when a peer relation is manually set to NONE or PARTNER. +func (p *Peer) ResetTrust() { + p.TrustScore = 100 + p.BlacklistReason = "" + p.BehaviorWarnings = nil +} + // AddExecution adds an execution to the list of failed executions func (ao *Peer) AddExecution(exec PeerExecution) { found := false diff --git a/models/resources/compute.go b/models/resources/compute.go index 7b45e91..d3a536d 100755 --- a/models/resources/compute.go +++ b/models/resources/compute.go @@ -65,6 +65,10 @@ type ComputeResourceInstance struct { Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"` } +// IsPeerless is always false for compute instances: a compute resource is +// infrastructure owned by a peer and can never be declared peerless. +func (ri *ComputeResourceInstance) IsPeerless() bool { return false } + func NewComputeResourceInstance(name string, peerID string) ResourceInstanceITF { return &ComputeResourceInstance{ ResourceInstance: ResourceInstance[*ComputeResourcePartnership]{ diff --git a/models/resources/data.go b/models/resources/data.go index 7479155..eee55fc 100755 --- a/models/resources/data.go +++ b/models/resources/data.go @@ -68,6 +68,13 @@ func NewDataInstance(name string, peerID string) ResourceInstanceITF { } func (ri *DataInstance) StoreDraftDefault() { + // Enforce peerless invariant: a public-origin instance cannot have peer ownership. + if ri.Origin.Ref != "" && (ri.CreatorID != "" || len(ri.Partnerships) > 0) { + // Strip partnerships and creator: the structural invariant wins. + // Origin.Ref presence is the authoritative signal that this is peerless. + ri.CreatorID = "" + ri.Partnerships = nil + } found := false for _, p := range ri.ResourceInstance.Env { if p.Attr == "source" { diff --git a/models/resources/interfaces.go b/models/resources/interfaces.go index 68b80c2..9f34113 100755 --- a/models/resources/interfaces.go +++ b/models/resources/interfaces.go @@ -28,6 +28,8 @@ type ResourceInstanceITF interface { utils.DBObject GetID() string GetName() string + GetOrigin() OriginMeta + IsPeerless() bool StoreDraftDefault() ClearEnv() FilterInstance(peerID string) diff --git a/models/resources/origin.go b/models/resources/origin.go new file mode 100644 index 0000000..41686db --- /dev/null +++ b/models/resources/origin.go @@ -0,0 +1,31 @@ +package resources + +// OriginType qualifies where a resource instance comes from. +type OriginType int + +const ( + // OriginPeer: instance offered by a known network peer (default). + OriginPeer OriginType = iota + // OriginPublic: instance from a public registry (Docker Hub, HuggingFace, etc.). + // No peer confirmation is needed; access is unrestricted. + OriginPublic + // OriginSelf: self-hosted instance with no third-party peer. + OriginSelf +) + +// OriginMeta carries provenance information for a resource instance. +type OriginMeta struct { + Type OriginType `json:"origin_type" bson:"origin_type"` + Ref string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"` // e.g. "docker.io/pytorch/pytorch:2.1" + License string `json:"origin_license,omitempty" bson:"origin_license,omitempty"` // SPDX identifier or free-form + Verified bool `json:"origin_verified" bson:"origin_verified"` // manually vetted by an OC admin +} + +// IsPeerless MUST NOT be used for authorization decisions. +// Use ResourceInstance.IsPeerless() instead, which derives the property +// from structural invariants rather than this self-declared field. +// +// This method is kept only for display/logging purposes. +func (o OriginMeta) DeclaredPeerless() bool { + return o.Type != OriginPeer +} diff --git a/models/resources/processing.go b/models/resources/processing.go index 7ba8afa..c1d624f 100755 --- a/models/resources/processing.go +++ b/models/resources/processing.go @@ -50,6 +50,15 @@ type ProcessingInstance struct { Access *ProcessingResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access } +func (ri *ProcessingInstance) StoreDraftDefault() { + // Enforce peerless invariant: a public-origin instance cannot have peer ownership. + if ri.Origin.Ref != "" && (ri.CreatorID != "" || len(ri.Partnerships) > 0) { + ri.CreatorID = "" + ri.Partnerships = nil + } + ri.ResourceInstance.StoreDraftDefault() +} + func NewProcessingInstance(name string, peerID string) ResourceInstanceITF { return &ProcessingInstance{ ResourceInstance: ResourceInstance[*ResourcePartnerShip[*ProcessingResourcePricingProfile]]{ diff --git a/models/resources/resource.go b/models/resources/resource.go index 86511e3..e1943a0 100755 --- a/models/resources/resource.go +++ b/models/resources/resource.go @@ -175,6 +175,12 @@ func VerifyAuthAction[T ResourceInstanceITF](baseInstance []T, request *tools.AP if len(instanceID) > 0 && !slices.Contains(instanceID, instance.GetID()) { continue } + // Structurally peerless instances (no creator, no partnerships, non-empty Ref) + // are freely accessible by any requester. + if instance.IsPeerless() { + instances = append(instances, instance) + continue + } _, peerGroups := instance.GetPeerGroups() for _, peers := range peerGroups { if request == nil { @@ -206,6 +212,7 @@ type GeoPoint struct { type ResourceInstance[T ResourcePartnerITF] struct { utils.AbstractObject + Origin OriginMeta `json:"origin,omitempty" bson:"origin,omitempty"` Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"` Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"` AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"` @@ -231,6 +238,19 @@ func NewInstance[T ResourcePartnerITF](name string) *ResourceInstance[T] { } } +func (ri *ResourceInstance[T]) GetOrigin() OriginMeta { + return ri.Origin +} + +// IsPeerless returns true when the instance has no owning peer and a non-empty +// registry reference. This is derived from structural invariants — NOT from the +// self-declared Origin.Type field — to prevent auth bypass via metadata manipulation: +// +// CreatorID == "" ∧ len(Partnerships) == 0 ∧ Origin.Ref != "" +func (ri *ResourceInstance[T]) IsPeerless() bool { + return ri.CreatorID == "" && len(ri.Partnerships) == 0 && ri.Origin.Ref != "" +} + func (ri *ResourceInstance[T]) FilterInstance(peerID string) { partnerships := []T{} for _, p := range ri.Partnerships { @@ -249,6 +269,9 @@ func (ri *ResourceInstance[T]) ClearEnv() { } func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int, buyingIndex *int, strategyIndex *int) pricing.PricingProfileITF { + if ri.IsPeerless() { + return pricing.GetDefaultPricingProfile() + } if partnershipIndex != nil && len(ri.Partnerships) > *partnershipIndex { prts := ri.Partnerships[*partnershipIndex] return prts.GetProfile(buyingIndex, strategyIndex) @@ -262,6 +285,9 @@ func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int, } func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF { + if ri.IsPeerless() { + return []pricing.PricingProfileITF{pricing.GetDefaultPricingProfile()} + } pricings := []pricing.PricingProfileITF{} for _, p := range ri.Partnerships { pricings = append(pricings, p.GetPricingsProfiles(peerID, groups)...) @@ -277,6 +303,10 @@ func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []strin } func (ri *ResourceInstance[T]) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) { + // Structurally peerless: universally accessible — wildcard on all peers. + if ri.IsPeerless() { + return []ResourcePartnerITF{}, []map[string][]string{{"*": {"*"}}} + } groups := []map[string][]string{} partners := []ResourcePartnerITF{} for _, p := range ri.Partnerships { diff --git a/models/resources/storage.go b/models/resources/storage.go index bec2fe4..7df2e5e 100755 --- a/models/resources/storage.go +++ b/models/resources/storage.go @@ -57,6 +57,10 @@ type StorageResourceInstance struct { Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage } +// IsPeerless is always false for storage instances: a storage resource is +// infrastructure owned by a peer and can never be declared peerless. +func (ri *StorageResourceInstance) IsPeerless() bool { return false } + func NewStorageResourceInstance(name string, peerID string) ResourceInstanceITF { return &StorageResourceInstance{ ResourceInstance: ResourceInstance[*StorageResourcePartnership]{ diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index e8d9822..354f686 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -242,6 +242,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools. InstanceID: priced.GetInstanceID(), ResourceType: dt, DestPeerID: priced.GetCreatorID(), + Peerless: priced.GetCreatorID() == "", WorkflowID: wfID, ExecutionID: d.GetID(), ExpectedStartDate: start, diff --git a/tools/kubernetes.go b/tools/kubernetes.go index f1d85ac..549c5e4 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -200,9 +200,9 @@ func (k *KubernetesService) ProvisionExecutionNamespace(ctx context.Context, ns } role := "argo-role" if err := k.CreateRole(ctx, ns, role, - [][]string{{"coordination.k8s.io"}, {""}, {""}, {"multicluster.admiralty.io"}}, - [][]string{{"leases"}, {"secrets"}, {"pods"}, {"podchaperons"}}, - [][]string{{"get", "create", "update"}, {"get"}, {"patch"}, {"get", "list", "watch", "create", "update", "patch", "delete"}}, + [][]string{{"coordination.k8s.io"}, {""}, {""}, {"multicluster.admiralty.io"}, {"argoproj.io"}}, + [][]string{{"leases"}, {"secrets"}, {"pods"}, {"podchaperons"}, {"workflowtaskresults"}}, + [][]string{{"get", "create", "update"}, {"get"}, {"patch"}, {"get", "list", "watch", "create", "update", "patch", "delete"}, {"create", "patch"}}, ); err != nil { return err } @@ -618,6 +618,7 @@ func (k *KubernetesService) CreatePVC(ctx context.Context, name, namespace, stor PersistentVolumeSource: v1.PersistentVolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: "/var/lib/oc-storage/" + name, + Type: func() *v1.HostPathType { t := v1.HostPathDirectoryOrCreate; return &t }(), }, }, ClaimRef: &v1.ObjectReference{ diff --git a/tools/nats_caller.go b/tools/nats_caller.go index 74a5bc1..29a41ab 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -31,6 +31,7 @@ var meths = []string{"remove execution", "create execution", "planner execution" "propalgation event", "search event", "confirm event", "considers event", "admiralty config event", "minio config event", "pvc config event", "workflow started event", "workflow step done event", "workflow done event", + "peer behavior event", } const ( @@ -61,6 +62,12 @@ const ( WORKFLOW_STARTED_EVENT WORKFLOW_STEP_DONE_EVENT WORKFLOW_DONE_EVENT + + // PEER_BEHAVIOR_EVENT is emitted by any trusted service (oc-scheduler, + // oc-datacenter, …) when a peer exhibits suspicious or fraudulent behavior. + // oc-discovery consumes it to update the peer's trust score and auto-blacklist + // below threshold. + PEER_BEHAVIOR_EVENT ) func (n NATSMethod) String() string { diff --git a/tools/peer_behavior.go b/tools/peer_behavior.go new file mode 100644 index 0000000..8aae23e --- /dev/null +++ b/tools/peer_behavior.go @@ -0,0 +1,49 @@ +package tools + +import "time" + +// BehaviorSeverity qualifies the gravity of a peer misbehavior. +type BehaviorSeverity int + +const ( + // BehaviorWarn: minor inconsistency — slight trust penalty. + BehaviorWarn BehaviorSeverity = iota + // BehaviorFraud: deliberate data manipulation (e.g. fake peerless Ref, + // invalid booking) — significant trust penalty. + BehaviorFraud + // BehaviorCritical: severe abuse (secret exfiltration, data corruption, + // system-level attack) — heavy penalty, near-immediate blacklist. + BehaviorCritical +) + +// scorePenalties maps each severity to a trust-score deduction (out of 100). +var scorePenalties = map[BehaviorSeverity]float64{ + BehaviorWarn: 5, + BehaviorFraud: 20, + BehaviorCritical: 40, +} + +// Penalty returns the trust-score deduction for this severity. +func (s BehaviorSeverity) Penalty() float64 { + if p, ok := scorePenalties[s]; ok { + return p + } + return 5 +} + +// PeerBehaviorReport is the payload carried by PEER_BEHAVIOR_EVENT. +// Any trusted service can emit it; oc-discovery is the sole consumer. +type PeerBehaviorReport struct { + // ReporterApp identifies the emitting service (e.g. "oc-scheduler", "oc-datacenter"). + ReporterApp string `json:"reporter_app"` + // TargetPeerID is the MongoDB DID (_id) of the offending peer. + TargetPeerID string `json:"target_peer_id"` + // Severity drives how much the trust score drops. + Severity BehaviorSeverity `json:"severity"` + // Reason is a human-readable description shown in the blacklist warning. + Reason string `json:"reason"` + // Evidence is an optional reference (booking ID, resource Ref, …). + Evidence string `json:"evidence,omitempty"` + // At is the timestamp of the observed misbehavior. + At time.Time `json:"at"` +}