Peerless + New Argo
This commit is contained in:
@@ -37,6 +37,15 @@ type Booking struct {
|
||||
// Authorization: identifies who created this draft and the Check session it belongs to.
|
||||
// Used to verify UPDATE and DELETE orders from remote schedulers.
|
||||
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
|
||||
|
||||
// Peerless is true when the booked resource has no destination peer
|
||||
// (e.g. a public Docker Hub image). No peer confirmation or pricing
|
||||
// negotiation is needed; the booking is stored locally only.
|
||||
Peerless bool `json:"peerless,omitempty" bson:"peerless,omitempty"`
|
||||
|
||||
// OriginRef carries the registry reference of a peerless resource
|
||||
// (e.g. "docker.io/pytorch/pytorch:2.1") so schedulers can validate it.
|
||||
OriginRef string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"`
|
||||
}
|
||||
|
||||
func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResume {
|
||||
|
||||
@@ -3,6 +3,7 @@ package peer
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
@@ -41,6 +42,15 @@ func (m PeerRelation) EnumIndex() int {
|
||||
return int(m)
|
||||
}
|
||||
|
||||
// BehaviorWarning records a single misbehavior observed by a trusted service.
|
||||
type BehaviorWarning struct {
|
||||
At time.Time `json:"at" bson:"at"`
|
||||
ReporterApp string `json:"reporter_app" bson:"reporter_app"`
|
||||
Severity tools.BehaviorSeverity `json:"severity" bson:"severity"`
|
||||
Reason string `json:"reason" bson:"reason"`
|
||||
Evidence string `json:"evidence,omitempty" bson:"evidence,omitempty"`
|
||||
}
|
||||
|
||||
// Peer is a struct that represents a peer
|
||||
type Peer struct {
|
||||
utils.AbstractObject
|
||||
@@ -56,12 +66,53 @@ type Peer struct {
|
||||
Relation PeerRelation `json:"relation" bson:"relation" default:"0"`
|
||||
ServicesState map[string]int `json:"services_state,omitempty" bson:"services_state,omitempty"`
|
||||
FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried
|
||||
|
||||
// Trust scoring — maintained by oc-discovery from PEER_BEHAVIOR_EVENT reports.
|
||||
TrustScore float64 `json:"trust_score" bson:"trust_score" default:"100"`
|
||||
BlacklistReason string `json:"blacklist_reason,omitempty" bson:"blacklist_reason,omitempty"`
|
||||
BehaviorWarnings []BehaviorWarning `json:"behavior_warnings,omitempty" bson:"behavior_warnings,omitempty"`
|
||||
}
|
||||
|
||||
func (ao *Peer) VerifyAuth(callName string, request *tools.APIRequest) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// BlacklistThreshold is the trust score below which a peer is auto-blacklisted.
|
||||
const BlacklistThreshold = 20.0
|
||||
|
||||
// ApplyBehaviorReport records a misbehavior, deducts the trust penalty, and
|
||||
// returns true when the trust score has fallen below BlacklistThreshold so the
|
||||
// caller can trigger the relation change.
|
||||
func (p *Peer) ApplyBehaviorReport(r tools.PeerBehaviorReport) (shouldBlacklist bool) {
|
||||
p.BehaviorWarnings = append(p.BehaviorWarnings, BehaviorWarning{
|
||||
At: r.At,
|
||||
ReporterApp: r.ReporterApp,
|
||||
Severity: r.Severity,
|
||||
Reason: r.Reason,
|
||||
Evidence: r.Evidence,
|
||||
})
|
||||
if p.TrustScore == 0 {
|
||||
p.TrustScore = 100 // initialise if never set
|
||||
}
|
||||
p.TrustScore -= r.Severity.Penalty()
|
||||
if p.TrustScore < 0 {
|
||||
p.TrustScore = 0
|
||||
}
|
||||
if p.TrustScore <= BlacklistThreshold {
|
||||
p.BlacklistReason = r.Reason
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ResetTrust clears all behavior history and resets the trust score to 100.
|
||||
// Must be called when a peer relation is manually set to NONE or PARTNER.
|
||||
func (p *Peer) ResetTrust() {
|
||||
p.TrustScore = 100
|
||||
p.BlacklistReason = ""
|
||||
p.BehaviorWarnings = nil
|
||||
}
|
||||
|
||||
// AddExecution adds an execution to the list of failed executions
|
||||
func (ao *Peer) AddExecution(exec PeerExecution) {
|
||||
found := false
|
||||
|
||||
@@ -65,6 +65,10 @@ type ComputeResourceInstance struct {
|
||||
Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
|
||||
}
|
||||
|
||||
// IsPeerless is always false for compute instances: a compute resource is
|
||||
// infrastructure owned by a peer and can never be declared peerless.
|
||||
func (ri *ComputeResourceInstance) IsPeerless() bool { return false }
|
||||
|
||||
func NewComputeResourceInstance(name string, peerID string) ResourceInstanceITF {
|
||||
return &ComputeResourceInstance{
|
||||
ResourceInstance: ResourceInstance[*ComputeResourcePartnership]{
|
||||
|
||||
@@ -68,6 +68,13 @@ func NewDataInstance(name string, peerID string) ResourceInstanceITF {
|
||||
}
|
||||
|
||||
func (ri *DataInstance) StoreDraftDefault() {
|
||||
// Enforce peerless invariant: a public-origin instance cannot have peer ownership.
|
||||
if ri.Origin.Ref != "" && (ri.CreatorID != "" || len(ri.Partnerships) > 0) {
|
||||
// Strip partnerships and creator: the structural invariant wins.
|
||||
// Origin.Ref presence is the authoritative signal that this is peerless.
|
||||
ri.CreatorID = ""
|
||||
ri.Partnerships = nil
|
||||
}
|
||||
found := false
|
||||
for _, p := range ri.ResourceInstance.Env {
|
||||
if p.Attr == "source" {
|
||||
|
||||
@@ -28,6 +28,8 @@ type ResourceInstanceITF interface {
|
||||
utils.DBObject
|
||||
GetID() string
|
||||
GetName() string
|
||||
GetOrigin() OriginMeta
|
||||
IsPeerless() bool
|
||||
StoreDraftDefault()
|
||||
ClearEnv()
|
||||
FilterInstance(peerID string)
|
||||
|
||||
31
models/resources/origin.go
Normal file
31
models/resources/origin.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package resources
|
||||
|
||||
// OriginType qualifies where a resource instance comes from.
|
||||
type OriginType int
|
||||
|
||||
const (
|
||||
// OriginPeer: instance offered by a known network peer (default).
|
||||
OriginPeer OriginType = iota
|
||||
// OriginPublic: instance from a public registry (Docker Hub, HuggingFace, etc.).
|
||||
// No peer confirmation is needed; access is unrestricted.
|
||||
OriginPublic
|
||||
// OriginSelf: self-hosted instance with no third-party peer.
|
||||
OriginSelf
|
||||
)
|
||||
|
||||
// OriginMeta carries provenance information for a resource instance.
|
||||
type OriginMeta struct {
|
||||
Type OriginType `json:"origin_type" bson:"origin_type"`
|
||||
Ref string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"` // e.g. "docker.io/pytorch/pytorch:2.1"
|
||||
License string `json:"origin_license,omitempty" bson:"origin_license,omitempty"` // SPDX identifier or free-form
|
||||
Verified bool `json:"origin_verified" bson:"origin_verified"` // manually vetted by an OC admin
|
||||
}
|
||||
|
||||
// IsPeerless MUST NOT be used for authorization decisions.
|
||||
// Use ResourceInstance.IsPeerless() instead, which derives the property
|
||||
// from structural invariants rather than this self-declared field.
|
||||
//
|
||||
// This method is kept only for display/logging purposes.
|
||||
func (o OriginMeta) DeclaredPeerless() bool {
|
||||
return o.Type != OriginPeer
|
||||
}
|
||||
@@ -50,6 +50,15 @@ type ProcessingInstance struct {
|
||||
Access *ProcessingResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access
|
||||
}
|
||||
|
||||
func (ri *ProcessingInstance) StoreDraftDefault() {
|
||||
// Enforce peerless invariant: a public-origin instance cannot have peer ownership.
|
||||
if ri.Origin.Ref != "" && (ri.CreatorID != "" || len(ri.Partnerships) > 0) {
|
||||
ri.CreatorID = ""
|
||||
ri.Partnerships = nil
|
||||
}
|
||||
ri.ResourceInstance.StoreDraftDefault()
|
||||
}
|
||||
|
||||
func NewProcessingInstance(name string, peerID string) ResourceInstanceITF {
|
||||
return &ProcessingInstance{
|
||||
ResourceInstance: ResourceInstance[*ResourcePartnerShip[*ProcessingResourcePricingProfile]]{
|
||||
|
||||
@@ -175,6 +175,12 @@ func VerifyAuthAction[T ResourceInstanceITF](baseInstance []T, request *tools.AP
|
||||
if len(instanceID) > 0 && !slices.Contains(instanceID, instance.GetID()) {
|
||||
continue
|
||||
}
|
||||
// Structurally peerless instances (no creator, no partnerships, non-empty Ref)
|
||||
// are freely accessible by any requester.
|
||||
if instance.IsPeerless() {
|
||||
instances = append(instances, instance)
|
||||
continue
|
||||
}
|
||||
_, peerGroups := instance.GetPeerGroups()
|
||||
for _, peers := range peerGroups {
|
||||
if request == nil {
|
||||
@@ -206,6 +212,7 @@ type GeoPoint struct {
|
||||
|
||||
type ResourceInstance[T ResourcePartnerITF] struct {
|
||||
utils.AbstractObject
|
||||
Origin OriginMeta `json:"origin,omitempty" bson:"origin,omitempty"`
|
||||
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
|
||||
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
|
||||
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
|
||||
@@ -231,6 +238,19 @@ func NewInstance[T ResourcePartnerITF](name string) *ResourceInstance[T] {
|
||||
}
|
||||
}
|
||||
|
||||
func (ri *ResourceInstance[T]) GetOrigin() OriginMeta {
|
||||
return ri.Origin
|
||||
}
|
||||
|
||||
// IsPeerless returns true when the instance has no owning peer and a non-empty
|
||||
// registry reference. This is derived from structural invariants — NOT from the
|
||||
// self-declared Origin.Type field — to prevent auth bypass via metadata manipulation:
|
||||
//
|
||||
// CreatorID == "" ∧ len(Partnerships) == 0 ∧ Origin.Ref != ""
|
||||
func (ri *ResourceInstance[T]) IsPeerless() bool {
|
||||
return ri.CreatorID == "" && len(ri.Partnerships) == 0 && ri.Origin.Ref != ""
|
||||
}
|
||||
|
||||
func (ri *ResourceInstance[T]) FilterInstance(peerID string) {
|
||||
partnerships := []T{}
|
||||
for _, p := range ri.Partnerships {
|
||||
@@ -249,6 +269,9 @@ func (ri *ResourceInstance[T]) ClearEnv() {
|
||||
}
|
||||
|
||||
func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int, buyingIndex *int, strategyIndex *int) pricing.PricingProfileITF {
|
||||
if ri.IsPeerless() {
|
||||
return pricing.GetDefaultPricingProfile()
|
||||
}
|
||||
if partnershipIndex != nil && len(ri.Partnerships) > *partnershipIndex {
|
||||
prts := ri.Partnerships[*partnershipIndex]
|
||||
return prts.GetProfile(buyingIndex, strategyIndex)
|
||||
@@ -262,6 +285,9 @@ func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int,
|
||||
}
|
||||
|
||||
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
|
||||
if ri.IsPeerless() {
|
||||
return []pricing.PricingProfileITF{pricing.GetDefaultPricingProfile()}
|
||||
}
|
||||
pricings := []pricing.PricingProfileITF{}
|
||||
for _, p := range ri.Partnerships {
|
||||
pricings = append(pricings, p.GetPricingsProfiles(peerID, groups)...)
|
||||
@@ -277,6 +303,10 @@ func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []strin
|
||||
}
|
||||
|
||||
func (ri *ResourceInstance[T]) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) {
|
||||
// Structurally peerless: universally accessible — wildcard on all peers.
|
||||
if ri.IsPeerless() {
|
||||
return []ResourcePartnerITF{}, []map[string][]string{{"*": {"*"}}}
|
||||
}
|
||||
groups := []map[string][]string{}
|
||||
partners := []ResourcePartnerITF{}
|
||||
for _, p := range ri.Partnerships {
|
||||
|
||||
@@ -57,6 +57,10 @@ type StorageResourceInstance struct {
|
||||
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
|
||||
}
|
||||
|
||||
// IsPeerless is always false for storage instances: a storage resource is
|
||||
// infrastructure owned by a peer and can never be declared peerless.
|
||||
func (ri *StorageResourceInstance) IsPeerless() bool { return false }
|
||||
|
||||
func NewStorageResourceInstance(name string, peerID string) ResourceInstanceITF {
|
||||
return &StorageResourceInstance{
|
||||
ResourceInstance: ResourceInstance[*StorageResourcePartnership]{
|
||||
|
||||
@@ -242,6 +242,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
|
||||
InstanceID: priced.GetInstanceID(),
|
||||
ResourceType: dt,
|
||||
DestPeerID: priced.GetCreatorID(),
|
||||
Peerless: priced.GetCreatorID() == "",
|
||||
WorkflowID: wfID,
|
||||
ExecutionID: d.GetID(),
|
||||
ExpectedStartDate: start,
|
||||
|
||||
@@ -200,9 +200,9 @@ func (k *KubernetesService) ProvisionExecutionNamespace(ctx context.Context, ns
|
||||
}
|
||||
role := "argo-role"
|
||||
if err := k.CreateRole(ctx, ns, role,
|
||||
[][]string{{"coordination.k8s.io"}, {""}, {""}, {"multicluster.admiralty.io"}},
|
||||
[][]string{{"leases"}, {"secrets"}, {"pods"}, {"podchaperons"}},
|
||||
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}, {"get", "list", "watch", "create", "update", "patch", "delete"}},
|
||||
[][]string{{"coordination.k8s.io"}, {""}, {""}, {"multicluster.admiralty.io"}, {"argoproj.io"}},
|
||||
[][]string{{"leases"}, {"secrets"}, {"pods"}, {"podchaperons"}, {"workflowtaskresults"}},
|
||||
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}, {"get", "list", "watch", "create", "update", "patch", "delete"}, {"create", "patch"}},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -618,6 +618,7 @@ func (k *KubernetesService) CreatePVC(ctx context.Context, name, namespace, stor
|
||||
PersistentVolumeSource: v1.PersistentVolumeSource{
|
||||
HostPath: &v1.HostPathVolumeSource{
|
||||
Path: "/var/lib/oc-storage/" + name,
|
||||
Type: func() *v1.HostPathType { t := v1.HostPathDirectoryOrCreate; return &t }(),
|
||||
},
|
||||
},
|
||||
ClaimRef: &v1.ObjectReference{
|
||||
|
||||
@@ -31,6 +31,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
|
||||
"propalgation event", "search event", "confirm event",
|
||||
"considers event", "admiralty config event", "minio config event", "pvc config event",
|
||||
"workflow started event", "workflow step done event", "workflow done event",
|
||||
"peer behavior event",
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -61,6 +62,12 @@ const (
|
||||
WORKFLOW_STARTED_EVENT
|
||||
WORKFLOW_STEP_DONE_EVENT
|
||||
WORKFLOW_DONE_EVENT
|
||||
|
||||
// PEER_BEHAVIOR_EVENT is emitted by any trusted service (oc-scheduler,
|
||||
// oc-datacenter, …) when a peer exhibits suspicious or fraudulent behavior.
|
||||
// oc-discovery consumes it to update the peer's trust score and auto-blacklist
|
||||
// below threshold.
|
||||
PEER_BEHAVIOR_EVENT
|
||||
)
|
||||
|
||||
func (n NATSMethod) String() string {
|
||||
|
||||
49
tools/peer_behavior.go
Normal file
49
tools/peer_behavior.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package tools
|
||||
|
||||
import "time"
|
||||
|
||||
// BehaviorSeverity qualifies the gravity of a peer misbehavior.
|
||||
type BehaviorSeverity int
|
||||
|
||||
const (
|
||||
// BehaviorWarn: minor inconsistency — slight trust penalty.
|
||||
BehaviorWarn BehaviorSeverity = iota
|
||||
// BehaviorFraud: deliberate data manipulation (e.g. fake peerless Ref,
|
||||
// invalid booking) — significant trust penalty.
|
||||
BehaviorFraud
|
||||
// BehaviorCritical: severe abuse (secret exfiltration, data corruption,
|
||||
// system-level attack) — heavy penalty, near-immediate blacklist.
|
||||
BehaviorCritical
|
||||
)
|
||||
|
||||
// scorePenalties maps each severity to a trust-score deduction (out of 100).
|
||||
var scorePenalties = map[BehaviorSeverity]float64{
|
||||
BehaviorWarn: 5,
|
||||
BehaviorFraud: 20,
|
||||
BehaviorCritical: 40,
|
||||
}
|
||||
|
||||
// Penalty returns the trust-score deduction for this severity.
|
||||
func (s BehaviorSeverity) Penalty() float64 {
|
||||
if p, ok := scorePenalties[s]; ok {
|
||||
return p
|
||||
}
|
||||
return 5
|
||||
}
|
||||
|
||||
// PeerBehaviorReport is the payload carried by PEER_BEHAVIOR_EVENT.
|
||||
// Any trusted service can emit it; oc-discovery is the sole consumer.
|
||||
type PeerBehaviorReport struct {
|
||||
// ReporterApp identifies the emitting service (e.g. "oc-scheduler", "oc-datacenter").
|
||||
ReporterApp string `json:"reporter_app"`
|
||||
// TargetPeerID is the MongoDB DID (_id) of the offending peer.
|
||||
TargetPeerID string `json:"target_peer_id"`
|
||||
// Severity drives how much the trust score drops.
|
||||
Severity BehaviorSeverity `json:"severity"`
|
||||
// Reason is a human-readable description shown in the blacklist warning.
|
||||
Reason string `json:"reason"`
|
||||
// Evidence is an optional reference (booking ID, resource Ref, …).
|
||||
Evidence string `json:"evidence,omitempty"`
|
||||
// At is the timestamp of the observed misbehavior.
|
||||
At time.Time `json:"at"`
|
||||
}
|
||||
Reference in New Issue
Block a user