13 Commits

Author SHA1 Message Date
mr
c0722483b8 IsNot in catalog strategy 2026-03-31 16:41:12 +02:00
mr
0aee593f29 Not in catalog strategy 2026-03-31 16:40:30 +02:00
mr
a4ab3285e3 Add attr inspired by docker 2026-03-30 10:21:09 +02:00
mr
45f2351b2f OC LIB -> EXTRA 2026-03-27 12:41:31 +01:00
mr
39cb1c715c debug filter on catalog 2026-03-27 12:14:15 +01:00
mr
87cf2cb12a Booking State 2026-03-26 12:02:03 +01:00
mr
4580200e80 Allowed_image 2026-03-25 10:20:16 +01:00
mr
6d0c78946e Peerless + New Argo 2026-03-24 12:49:37 +01:00
mr
211339947c kubernetes + podchaperon 2026-03-23 16:20:20 +01:00
mr
b76b22a8fb Pv + Pvc for admiralty purpose 2026-03-23 12:29:35 +01:00
mr
fa9893e150 pvc immediate 2026-03-23 12:16:29 +01:00
mr
14b449f547 Fusion + Nats Complement 2026-03-23 11:53:21 +01:00
mr
5b197c91e0 Add CreatePVC and DeletePVC to KubernetesService
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 11:42:58 +01:00
24 changed files with 443 additions and 31 deletions

View File

@@ -64,6 +64,7 @@ const (
PURCHASE_RESOURCE = tools.PURCHASE_RESOURCE
NATIVE_TOOL = tools.NATIVE_TOOL
EXECUTION_VERIFICATION = tools.EXECUTION_VERIFICATION
ALLOWED_IMAGE = tools.ALLOWED_IMAGE
)
func GetMySelf() (*peer.Peer, error) {
@@ -202,7 +203,7 @@ func ExtractTokenInfo(request http.Request) (string, string, []string) {
if reqToken != "" {
token := strings.Split(reqToken, ".")
if len(token) > 2 {
bytes, err := base64.StdEncoding.DecodeString(token[2])
bytes, err := base64.RawURLEncoding.DecodeString(token[1])
if err != nil {
return "", "", []string{}
}
@@ -217,7 +218,7 @@ func ExtractTokenInfo(request http.Request) (string, string, []string) {
return "", "", []string{}
}
func InitAPI(appName string) {
func InitAPI(appName string, extraRoutes ...map[string][]string) {
InitDaemon(appName)
beego.BConfig.Listen.HTTPPort = config.GetConfig().APIPort
beego.BConfig.WebConfig.DirectoryIndex = true
@@ -231,7 +232,7 @@ func InitAPI(appName string) {
})
beego.InsertFilter("*", beego.BeforeRouter, c)
api := &tools.API{}
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo())
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo(), extraRoutes...)
}
//

View File

@@ -0,0 +1,56 @@
package allowed_image
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// AllowedImage représente une image de conteneur autorisée à persister
// sur un peer après l'exécution d'un workflow.
//
// La décision de rétention est entièrement locale au datacenter —
// le fournisseur de processing n'a aucun levier sur cette liste.
//
// Règle de matching (côté oc-datacenter) :
// - Registry vide = toutes les registries
// - TagConstraint vide = toutes les versions
// - TagConstraint non vide = exact ou glob (ex: "3.*", "1.2.3")
//
// Les entrées IsDefault sont créées au bootstrap et ne peuvent pas
// être supprimées via l'API.
type AllowedImage struct {
utils.AbstractObject
// Registry source (ex: "docker.io", "registry.example.com").
// Vide = wildcard, accepte n'importe quelle registry.
Registry string `json:"registry,omitempty" bson:"registry,omitempty"`
// Image est le nom de l'image sans registry ni tag
// (ex: "natsio/nats-box", "library/alpine").
Image string `json:"image" bson:"image" validate:"required"`
// TagConstraint est la contrainte sur le tag.
// Vide = toutes les versions autorisées.
// Supporte exact ("1.2.3") ou glob ("3.*", "*-alpine").
TagConstraint string `json:"tag_constraint,omitempty" bson:"tag_constraint,omitempty"`
// IsDefault marque les entrées bootstrap insérées au démarrage.
// Ces entrées ne peuvent pas être supprimées via l'API.
IsDefault bool `json:"is_default,omitempty" bson:"is_default,omitempty"`
}
func (a *AllowedImage) StoreDraftDefault() {
a.IsDraft = false // les allowed images sont actives immédiatement
}
func (a *AllowedImage) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
return true, set
}
func (a *AllowedImage) CanDelete() bool {
return !a.IsDefault // les entrées bootstrap sont non supprimables
}
func (a *AllowedImage) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request)
}

View File

@@ -0,0 +1,23 @@
package allowed_image
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type allowedImageMongoAccessor struct {
utils.AbstractAccessor[*AllowedImage]
}
func NewAccessor(request *tools.APIRequest) *allowedImageMongoAccessor {
return &allowedImageMongoAccessor{
AbstractAccessor: utils.AbstractAccessor[*AllowedImage]{
Logger: logs.CreateLogger(tools.ALLOWED_IMAGE.String()),
Request: request,
Type: tools.ALLOWED_IMAGE,
New: func() *AllowedImage { return &AllowedImage{} },
NotImplemented: []string{"CopyOne"},
},
}
}

View File

@@ -37,6 +37,15 @@ type Booking struct {
// Authorization: identifies who created this draft and the Check session it belongs to.
// Used to verify UPDATE and DELETE orders from remote schedulers.
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
// Peerless is true when the booked resource has no destination peer
// (e.g. a public Docker Hub image). No peer confirmation or pricing
// negotiation is needed; the booking is stored locally only.
Peerless bool `json:"peerless,omitempty" bson:"peerless,omitempty"`
// OriginRef carries the registry reference of a peerless resource
// (e.g. "docker.io/pytorch/pytorch:2.1") so schedulers can validate it.
OriginRef string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"`
}
func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResume {

View File

@@ -152,8 +152,10 @@ func BookingEstimation(t TimePricingStrategy, price float64, locationDurationInS
// may suppress in pricing strategy -> to set in map
type PricingStrategy[T Strategy] struct {
Price float64 `json:"price" bson:"price" default:"0"` // Price is the Price of the pricing
Currency string `json:"currency" bson:"currency" default:"USD"` // Currency is the currency of the pricing
Price float64 `json:"price" bson:"price" default:"0"` // Price is the Price of the pricing
Currency string `json:"currency" bson:"currency" default:"USD"` // Currency is the currency of the pricing
// NO NEED ?
BuyingStrategy BuyingStrategy `json:"buying_strategy" bson:"buying_strategy" default:"0"` // BuyingStrategy is the buying strategy of the pricing
TimePricingStrategy TimePricingStrategy `json:"time_pricing_strategy" bson:"time_pricing_strategy" default:"0"` // TimePricingStrategy is the time pricing strategy of the pricing
OverrideStrategy T `json:"override_strategy" bson:"override_strategy" default:"-1"` // Modulation is the modulation of the pricing

View File

@@ -2,6 +2,7 @@ package models
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/allowed_image"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live"
@@ -46,6 +47,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.LIVE_STORAGE.String(): func() utils.DBObject { return &live.LiveStorage{} },
tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
}
// Model returns the model object based on the model type

View File

@@ -3,6 +3,7 @@ package peer
import (
"fmt"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -41,12 +42,22 @@ func (m PeerRelation) EnumIndex() int {
return int(m)
}
// BehaviorWarning records a single misbehavior observed by a trusted service.
type BehaviorWarning struct {
At time.Time `json:"at" bson:"at"`
ReporterApp string `json:"reporter_app" bson:"reporter_app"`
Severity tools.BehaviorSeverity `json:"severity" bson:"severity"`
Reason string `json:"reason" bson:"reason"`
Evidence string `json:"evidence,omitempty" bson:"evidence,omitempty"`
}
// Peer is a struct that represents a peer
type Peer struct {
utils.AbstractObject
Verify bool `json:"verify" bson:"verify"`
PeerID string `json:"peer_id" bson:"peer_id" validate:"required"`
Verify bool `json:"verify" bson:"verify"`
OrganizationID string `json:"organization_id" bson:"organization_id"`
PeerID string `json:"peer_id" bson:"peer_id" validate:"required"`
APIUrl string `json:"api_url" bson:"api_url" validate:"required"` // Url is the URL of the peer (base64url)
StreamAddress string `json:"stream_address" bson:"stream_address" validate:"required"` // Url is the URL of the peer (base64url)
@@ -56,12 +67,53 @@ type Peer struct {
Relation PeerRelation `json:"relation" bson:"relation" default:"0"`
ServicesState map[string]int `json:"services_state,omitempty" bson:"services_state,omitempty"`
FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried
// Trust scoring — maintained by oc-discovery from PEER_BEHAVIOR_EVENT reports.
TrustScore float64 `json:"trust_score" bson:"trust_score" default:"100"`
BlacklistReason string `json:"blacklist_reason,omitempty" bson:"blacklist_reason,omitempty"`
BehaviorWarnings []BehaviorWarning `json:"behavior_warnings,omitempty" bson:"behavior_warnings,omitempty"`
}
func (ao *Peer) VerifyAuth(callName string, request *tools.APIRequest) bool {
return true
}
// BlacklistThreshold is the trust score below which a peer is auto-blacklisted.
const BlacklistThreshold = 20.0
// ApplyBehaviorReport records a misbehavior, deducts the trust penalty, and
// returns true when the trust score has fallen below BlacklistThreshold so the
// caller can trigger the relation change.
func (p *Peer) ApplyBehaviorReport(r tools.PeerBehaviorReport) (shouldBlacklist bool) {
p.BehaviorWarnings = append(p.BehaviorWarnings, BehaviorWarning{
At: r.At,
ReporterApp: r.ReporterApp,
Severity: r.Severity,
Reason: r.Reason,
Evidence: r.Evidence,
})
if p.TrustScore == 0 {
p.TrustScore = 100 // initialise if never set
}
p.TrustScore -= r.Severity.Penalty()
if p.TrustScore < 0 {
p.TrustScore = 0
}
if p.TrustScore <= BlacklistThreshold {
p.BlacklistReason = r.Reason
return true
}
return false
}
// ResetTrust clears all behavior history and resets the trust score to 100.
// Must be called when a peer relation is manually set to NONE or PARTNER.
func (p *Peer) ResetTrust() {
p.TrustScore = 100
p.BlacklistReason = ""
p.BehaviorWarnings = nil
}
// AddExecution adds an execution to the list of failed executions
func (ao *Peer) AddExecution(exec PeerExecution) {
found := false

View File

@@ -42,6 +42,23 @@ func (wfa *peerMongoAccessor) ShouldVerifyAuth() bool {
return !wfa.OverrideAuth
}
/*
TODO : organization_ID est un peer_ID duquel on se revendique faire parti.
Ca implique une clé d'organisation + une demande d'intégration.
Slave-Master IOT
*/
func (dca *peerMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
pp, _ := utils.GetMySelf(NewAccessor(&tools.APIRequest{Admin: true}))
if data != nil {
d := data.(*Peer)
if pp != nil && d.OrganizationID != "" && d.OrganizationID == pp.(*Peer).OrganizationID {
d.Relation = PARTNER // defaulting on partner if same organization.
}
}
return utils.GenericStoreOne(data, dca)
}
/*
* Nothing special here, just the basic CRUD operations
*/

View File

@@ -65,6 +65,10 @@ type ComputeResourceInstance struct {
Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
}
// IsPeerless is always false for compute instances: a compute resource is
// infrastructure owned by a peer and can never be declared peerless.
func (ri *ComputeResourceInstance) IsPeerless() bool { return false }
func NewComputeResourceInstance(name string, peerID string) ResourceInstanceITF {
return &ComputeResourceInstance{
ResourceInstance: ResourceInstance[*ComputeResourcePartnership]{

View File

@@ -68,6 +68,13 @@ func NewDataInstance(name string, peerID string) ResourceInstanceITF {
}
func (ri *DataInstance) StoreDraftDefault() {
// Enforce peerless invariant: a public-origin instance cannot have peer ownership.
if ri.Origin.Ref != "" && (ri.CreatorID != "" || len(ri.Partnerships) > 0) {
// Strip partnerships and creator: the structural invariant wins.
// Origin.Ref presence is the authoritative signal that this is peerless.
ri.CreatorID = ""
ri.Partnerships = nil
}
found := false
for _, p := range ri.ResourceInstance.Env {
if p.Attr == "source" {

View File

@@ -28,6 +28,8 @@ type ResourceInstanceITF interface {
utils.DBObject
GetID() string
GetName() string
GetOrigin() OriginMeta
IsPeerless() bool
StoreDraftDefault()
ClearEnv()
FilterInstance(peerID string)

View File

@@ -0,0 +1,31 @@
package resources
// OriginType qualifies where a resource instance comes from.
type OriginType int
const (
// OriginPeer: instance offered by a known network peer (default).
OriginPeer OriginType = iota
// OriginPublic: instance from a public registry (Docker Hub, HuggingFace, etc.).
// No peer confirmation is needed; access is unrestricted.
OriginPublic
// OriginSelf: self-hosted instance with no third-party peer.
OriginSelf
)
// OriginMeta carries provenance information for a resource instance.
type OriginMeta struct {
Type OriginType `json:"origin_type" bson:"origin_type"`
Ref string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"` // e.g. "docker.io/pytorch/pytorch:2.1"
License string `json:"origin_license,omitempty" bson:"origin_license,omitempty"` // SPDX identifier or free-form
Verified bool `json:"origin_verified" bson:"origin_verified"` // manually vetted by an OC admin
}
// IsPeerless MUST NOT be used for authorization decisions.
// Use ResourceInstance.IsPeerless() instead, which derives the property
// from structural invariants rather than this self-declared field.
//
// This method is kept only for display/logging purposes.
func (o OriginMeta) DeclaredPeerless() bool {
return o.Type != OriginPeer
}

View File

@@ -47,7 +47,18 @@ type ProcessingResourceAccess struct {
type ProcessingInstance struct {
ResourceInstance[*ResourcePartnerShip[*ProcessingResourcePricingProfile]]
Access *ProcessingResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access
Access *ProcessingResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access
SizeGB int `json:"size_gb,omitempty" bson:"size_gb,omitempty"`
ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"`
}
func (ri *ProcessingInstance) StoreDraftDefault() {
// Enforce peerless invariant: a public-origin instance cannot have peer ownership.
if ri.Origin.Ref != "" && (ri.CreatorID != "" || len(ri.Partnerships) > 0) {
ri.CreatorID = ""
ri.Partnerships = nil
}
ri.ResourceInstance.StoreDraftDefault()
}
func NewProcessingInstance(name string, peerID string) ResourceInstanceITF {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"slices"
"time"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs"
@@ -175,6 +176,12 @@ func VerifyAuthAction[T ResourceInstanceITF](baseInstance []T, request *tools.AP
if len(instanceID) > 0 && !slices.Contains(instanceID, instance.GetID()) {
continue
}
// Structurally peerless instances (no creator, no partnerships, non-empty Ref)
// are freely accessible by any requester.
if instance.IsPeerless() {
instances = append(instances, instance)
continue
}
_, peerGroups := instance.GetPeerGroups()
for _, peers := range peerGroups {
if request == nil {
@@ -206,6 +213,9 @@ type GeoPoint struct {
type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject
ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"`
LastUpdate time.Time `json:"last_update,omitempty" bson:"last_update,omitempty"`
Origin OriginMeta `json:"origin,omitempty" bson:"origin,omitempty"`
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
@@ -231,6 +241,19 @@ func NewInstance[T ResourcePartnerITF](name string) *ResourceInstance[T] {
}
}
func (ri *ResourceInstance[T]) GetOrigin() OriginMeta {
return ri.Origin
}
// IsPeerless returns true when the instance has no owning peer and a non-empty
// registry reference. This is derived from structural invariants — NOT from the
// self-declared Origin.Type field — to prevent auth bypass via metadata manipulation:
//
// CreatorID == "" ∧ len(Partnerships) == 0 ∧ Origin.Ref != ""
func (ri *ResourceInstance[T]) IsPeerless() bool {
return ri.CreatorID == "" && len(ri.Partnerships) == 0 && ri.Origin.Ref != ""
}
func (ri *ResourceInstance[T]) FilterInstance(peerID string) {
partnerships := []T{}
for _, p := range ri.Partnerships {
@@ -249,6 +272,9 @@ func (ri *ResourceInstance[T]) ClearEnv() {
}
func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int, buyingIndex *int, strategyIndex *int) pricing.PricingProfileITF {
if ri.IsPeerless() {
return pricing.GetDefaultPricingProfile()
}
if partnershipIndex != nil && len(ri.Partnerships) > *partnershipIndex {
prts := ri.Partnerships[*partnershipIndex]
return prts.GetProfile(buyingIndex, strategyIndex)
@@ -262,6 +288,9 @@ func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int,
}
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
if ri.IsPeerless() {
return []pricing.PricingProfileITF{pricing.GetDefaultPricingProfile()}
}
pricings := []pricing.PricingProfileITF{}
for _, p := range ri.Partnerships {
pricings = append(pricings, p.GetPricingsProfiles(peerID, groups)...)
@@ -277,6 +306,10 @@ func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []strin
}
func (ri *ResourceInstance[T]) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) {
// Structurally peerless: universally accessible — wildcard on all peers.
if ri.IsPeerless() {
return []ResourcePartnerITF{}, []map[string][]string{{"*": {"*"}}}
}
groups := []map[string][]string{}
partners := []ResourcePartnerITF{}
for _, p := range ri.Partnerships {

View File

@@ -108,12 +108,11 @@ func (a *ResourceMongoAccessor[T]) GetExec(isDraft bool) func(utils.DBObject) ut
func (abs *ResourceMongoAccessor[T]) GetObjectFilters(search string) *dbs.Filters {
return &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}

View File

@@ -57,6 +57,10 @@ type StorageResourceInstance struct {
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
}
// IsPeerless is always false for storage instances: a storage resource is
// infrastructure owned by a peer and can never be declared peerless.
func (ri *StorageResourceInstance) IsPeerless() bool { return false }
func NewStorageResourceInstance(name string, peerID string) ResourceInstanceITF {
return &StorageResourceInstance{
ResourceInstance: ResourceInstance[*StorageResourcePartnership]{

View File

@@ -31,6 +31,7 @@ const (
*/
type AbstractObject struct {
UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"`
NotInCatalog bool `json:"not_in_catalog" bson:"not_in_catalog" default:"false"`
Name string `json:"name,omitempty" bson:"name,omitempty" validate:"required"`
IsDraft bool `json:"is_draft" bson:"is_draft" default:"false"`
CreatorID string `json:"creator_id,omitempty" bson:"creator_id,omitempty"`
@@ -47,6 +48,10 @@ func (ri *AbstractObject) GetAccessor(request *tools.APIRequest) Accessor {
return nil
}
func (r *AbstractObject) IsNotInCatalog() bool {
return r.NotInCatalog
}
func (r *AbstractObject) Unsign() {
r.Signature = nil
}

View File

@@ -18,6 +18,7 @@ type ShallowDBObject interface {
// DBObject is an interface that defines the basic methods for a DBObject
type DBObject interface {
GenerateID()
IsNotInCatalog() bool
SetID(id string)
GetID() string
GetName() string

View File

@@ -17,6 +17,17 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
// BookingState tracks the reservation and completion status of a single booking
// within a workflow execution.
// - IsBooked: true while the resource is actively reserved (set on WORKFLOW_STARTED_EVENT,
// cleared on WORKFLOW_STEP_DONE_EVENT / WORKFLOW_DONE_EVENT).
// - IsDone: true once the booking has been confirmed by the remote peer (CONSIDERS_EVENT)
// or completed (WORKFLOW_STEP_DONE_EVENT / WORKFLOW_DONE_EVENT).
type BookingState struct {
IsBooked bool `json:"is_booked" bson:"is_booked"`
IsDone bool `json:"is_done" bson:"is_done"`
}
/*
* WorkflowExecution is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
@@ -33,8 +44,8 @@ type WorkflowExecution struct {
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
BookingsState map[string]bool `json:"bookings_state" bson:"bookings_state,omitempty"` // WorkflowID is the ID of the workflow
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // WorkflowID is the ID of the workflow
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
@@ -189,9 +200,9 @@ func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[t
booking = append(booking, d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
for _, p := range booking {
if d.BookingsState == nil {
d.BookingsState = map[string]bool{}
d.BookingsState = map[string]BookingState{}
}
d.BookingsState[p.GetID()] = false
d.BookingsState[p.GetID()] = BookingState{}
}
return booking
}
@@ -242,6 +253,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
InstanceID: priced.GetInstanceID(),
ResourceType: dt,
DestPeerID: priced.GetCreatorID(),
Peerless: priced.GetCreatorID() == "",
WorkflowID: wfID,
ExecutionID: d.GetID(),
ExpectedStartDate: start,

View File

@@ -60,16 +60,16 @@ func (s State) String() string {
type API struct{}
func (a *API) Discovered(infos []*beego.ControllerInfo) {
func (a *API) Discovered(infos []*beego.ControllerInfo, extra ...map[string][]string) {
respondToDiscovery := func(resp NATSResponse) {
var m map[string]interface{}
json.Unmarshal(resp.Payload, &m)
if len(m) == 0 {
a.SubscribeRouter(infos)
a.SubscribeRouter(infos, extra...)
}
}
a.ListenRouter(respondToDiscovery)
a.SubscribeRouter(infos)
a.SubscribeRouter(infos, extra...)
}
// GetState returns the state of the API
@@ -99,11 +99,12 @@ func (a *API) ListenRouter(exec func(msg NATSResponse)) {
})
}
func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
func (a *API) SubscribeRouter(infos []*beego.ControllerInfo, extra ...map[string][]string) {
nats := NewNATSCaller()
appPrefix := "/" + strings.ReplaceAll(config.GetAppName(), "oc-", "")
discovery := map[string][]string{}
for _, info := range infos {
path := strings.ReplaceAll(info.GetPattern(), "/oc/", "/"+strings.ReplaceAll(config.GetAppName(), "oc-", ""))
path := strings.ReplaceAll(info.GetPattern(), "/oc/", appPrefix+"/")
for k, v := range info.GetMethod() {
if discovery[path] == nil {
discovery[path] = []string{}
@@ -115,6 +116,15 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
}
}
}
for _, extraRoutes := range extra {
for rawPath, methods := range extraRoutes {
path := strings.ReplaceAll(rawPath, "/oc/", appPrefix+"/")
if discovery[path] == nil {
discovery[path] = []string{}
}
discovery[path] = append(discovery[path], methods...)
}
}
b, _ := json.Marshal(discovery)
go nats.SetNATSPub(DISCOVERY, NATSResponse{

View File

@@ -32,6 +32,7 @@ const (
BILL
NATIVE_TOOL
EXECUTION_VERIFICATION
ALLOWED_IMAGE
)
var NOAPI = func() string {
@@ -88,6 +89,7 @@ var InnerDefaultAPI = [...]func() string{
NOAPI,
CATALOGAPI,
SCHEDULERAPI,
DATACENTERAPI,
}
// Bind the standard data name to the data type
@@ -114,6 +116,7 @@ var Str = [...]string{
"bill",
"native_tool",
"execution_verification",
"allowed_image",
}
func FromString(comp string) int {
@@ -149,7 +152,7 @@ func DataTypeList() []DataType {
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE,
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL}
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE}
}
type PropalgationMessage struct {
@@ -171,6 +174,7 @@ const (
PB_CONSIDERS
PB_ADMIRALTY_CONFIG
PB_MINIO_CONFIG
PB_PVC_CONFIG
PB_CLOSE_SEARCH
NONE
)

View File

@@ -14,6 +14,7 @@ import (
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -97,8 +98,8 @@ func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) erro
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Labels: map[string]string{
"multicluster-scheduler": "enabled",
Annotations: map[string]string{
"multicluster.admiralty.io/elect": "",
},
},
}
@@ -199,9 +200,9 @@ func (k *KubernetesService) ProvisionExecutionNamespace(ctx context.Context, ns
}
role := "argo-role"
if err := k.CreateRole(ctx, ns, role,
[][]string{{"coordination.k8s.io"}, {""}, {""}},
[][]string{{"leases"}, {"secrets"}, {"pods"}},
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}},
[][]string{{"coordination.k8s.io"}, {""}, {""}, {"multicluster.admiralty.io"}, {"argoproj.io"}},
[][]string{{"leases"}, {"secrets"}, {"pods"}, {"podchaperons"}, {"workflowtaskresults"}},
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}, {"get", "list", "watch", "create", "update", "patch", "delete"}, {"create", "patch"}},
); err != nil {
return err
}
@@ -598,6 +599,75 @@ func (k *KubernetesService) CreateSecret(context context.Context, minioId string
return nil
}
// CreatePVC creates a static PersistentVolume + PersistentVolumeClaim in the given namespace.
// Static provisioning (no StorageClass) avoids the WaitForFirstConsumer deadlock
// with Admiralty virtual nodes — the PVC binds immediately.
func (k *KubernetesService) CreatePVC(ctx context.Context, name, namespace, storageSize string) error {
storageClassName := ""
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(storageSize),
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
StorageClassName: storageClassName,
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
PersistentVolumeSource: v1.PersistentVolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/var/lib/oc-storage/" + name,
Type: func() *v1.HostPathType { t := v1.HostPathDirectoryOrCreate; return &t }(),
},
},
ClaimRef: &v1.ObjectReference{
Namespace: namespace,
Name: name,
},
},
}
_, err := k.Set.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("CreatePV %s: %w", name, err)
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
StorageClassName: &storageClassName,
VolumeName: name,
Resources: v1.VolumeResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(storageSize),
},
},
},
}
_, err = k.Set.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("CreatePVC %s/%s: %w", namespace, name, err)
}
return nil
}
// DeletePVC deletes a PersistentVolumeClaim and its associated PersistentVolume.
func (k *KubernetesService) DeletePVC(ctx context.Context, name, namespace string) error {
err := k.Set.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("DeletePVC %s/%s: %w", namespace, name, err)
}
err = k.Set.CoreV1().PersistentVolumes().Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("DeletePV %s: %w", name, err)
}
return nil
}
// ============== ADMIRALTY ==============
// Returns a concatenation of the peerId and namespace in order for
// kubernetes ressources to have a unique name, under 63 characters

View File

@@ -29,8 +29,9 @@ type NATSMethod int
var meths = []string{"remove execution", "create execution", "planner execution", "discovery",
"workflow event", "argo kube event", "create resource", "remove resource",
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event",
"considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event",
"peer behavior event",
}
const (
@@ -53,6 +54,7 @@ const (
CONSIDERS_EVENT
ADMIRALTY_CONFIG_EVENT
MINIO_CONFIG_EVENT
PVC_CONFIG_EVENT
// Workflow lifecycle events emitted by oc-monitord.
// oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state.
@@ -60,6 +62,12 @@ const (
WORKFLOW_STARTED_EVENT
WORKFLOW_STEP_DONE_EVENT
WORKFLOW_DONE_EVENT
// PEER_BEHAVIOR_EVENT is emitted by any trusted service (oc-scheduler,
// oc-datacenter, …) when a peer exhibits suspicious or fraudulent behavior.
// oc-discovery consumes it to update the peer's trust score and auto-blacklist
// below threshold.
PEER_BEHAVIOR_EVENT
)
func (n NATSMethod) String() string {
@@ -70,7 +78,7 @@ func (n NATSMethod) String() string {
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v

49
tools/peer_behavior.go Normal file
View File

@@ -0,0 +1,49 @@
package tools
import "time"
// BehaviorSeverity qualifies the gravity of a peer misbehavior.
type BehaviorSeverity int
const (
// BehaviorWarn: minor inconsistency — slight trust penalty.
BehaviorWarn BehaviorSeverity = iota
// BehaviorFraud: deliberate data manipulation (e.g. fake peerless Ref,
// invalid booking) — significant trust penalty.
BehaviorFraud
// BehaviorCritical: severe abuse (secret exfiltration, data corruption,
// system-level attack) — heavy penalty, near-immediate blacklist.
BehaviorCritical
)
// scorePenalties maps each severity to a trust-score deduction (out of 100).
var scorePenalties = map[BehaviorSeverity]float64{
BehaviorWarn: 5,
BehaviorFraud: 20,
BehaviorCritical: 40,
}
// Penalty returns the trust-score deduction for this severity.
func (s BehaviorSeverity) Penalty() float64 {
if p, ok := scorePenalties[s]; ok {
return p
}
return 5
}
// PeerBehaviorReport is the payload carried by PEER_BEHAVIOR_EVENT.
// Any trusted service can emit it; oc-discovery is the sole consumer.
type PeerBehaviorReport struct {
// ReporterApp identifies the emitting service (e.g. "oc-scheduler", "oc-datacenter").
ReporterApp string `json:"reporter_app"`
// TargetPeerID is the MongoDB DID (_id) of the offending peer.
TargetPeerID string `json:"target_peer_id"`
// Severity drives how much the trust score drops.
Severity BehaviorSeverity `json:"severity"`
// Reason is a human-readable description shown in the blacklist warning.
Reason string `json:"reason"`
// Evidence is an optional reference (booking ID, resource Ref, …).
Evidence string `json:"evidence,omitempty"`
// At is the timestamp of the observed misbehavior.
At time.Time `json:"at"`
}