38 Commits

Author SHA1 Message Date
mr
9bb3d897b3 parasite log 2026-04-29 11:56:23 +02:00
mr
47d487ea80 ws token 2026-04-29 07:09:13 +02:00
mr
a8b7d4d0bc debug service + dynamic 2026-04-28 13:24:25 +02:00
mr
7a12506531 live service in oclib 2026-04-28 12:05:53 +02:00
mr
f926a42066 Live x Resource Synergy 2026-04-28 11:48:23 +02:00
mr
e3fbe7688a SelectedEmbeddedStorages 2026-04-28 08:55:08 +02:00
mr
318fd52289 SERVICE_RESOURCE 2026-04-27 13:11:14 +02:00
mr
26fc02c5b2 oclib 2026-04-27 12:52:28 +02:00
mr
f048b420d7 Addon 2026-04-27 11:16:50 +02:00
mr
0b54d6640d purchase From Nano 2026-04-23 12:05:08 +02:00
mr
7b3b9cb7bf master 2026-04-23 11:45:55 +02:00
mr
d9b1ad8dde ToMaster 2026-04-23 11:40:13 +02:00
mr
d6106dacde FromNano 2026-04-23 11:36:39 +02:00
mr
365a1d670c from_nano for booking 2026-04-23 11:19:23 +02:00
mr
25880077d1 pending_master 2026-04-23 11:08:02 +02:00
mr
560c997bf1 pending nano for nano flow. 2026-04-23 10:33:51 +02:00
mr
747368c79a nano 2026-04-23 10:16:13 +02:00
mr
e5e5706834 master role 2026-04-23 10:11:41 +02:00
mr
b9ad5d5ea7 is_nano 2026-04-23 10:04:12 +02:00
mr
e70e89b630 Api Struct + Nano env 2026-04-23 09:48:39 +02:00
mr
9c2663601a Service + Storage Binded to Compute 2026-04-23 09:24:02 +02:00
mr
538496cd60 debug cache 2026-04-22 14:13:28 +02:00
mr
a4366d3a09 follow purchase 2026-04-22 11:54:16 +02:00
mr
51e2dcc404 Load One pb 2026-04-22 11:47:08 +02:00
mr
c208e2ccef sub delete for loop 2026-04-22 11:38:21 +02:00
mr
5cda4fdd40 missed placed 2026-04-22 11:25:32 +02:00
mr
b92634ccba debug extend resource 2026-04-22 11:15:39 +02:00
mr
da237b1d26 oclib 2026-04-22 10:55:52 +02:00
mr
94e3ebbdd9 temp by pass purchase 2026-04-22 10:24:06 +02:00
mr
6741e929cc purchase as string 2026-04-22 09:48:16 +02:00
mr
a08c9b084d GetExtends adjust 2026-04-22 09:18:08 +02:00
mr
17a45eb5d1 GetExtends 2026-04-22 09:14:21 +02:00
mr
0c6efee276 Kick extend treatment 2026-04-21 14:45:04 +02:00
mr
bbaea4fec4 extended for load all + search all 2026-04-21 14:36:52 +02:00
mr
d57ee0b5e7 Extend for Human Readable 2026-04-21 14:30:45 +02:00
mr
50a5e90f33 Native access debug 2026-04-21 08:16:04 +02:00
mr
5cc04ee490 oc-lib 2026-04-17 09:45:00 +02:00
mr
883c0bec3d graph 2026-04-16 15:19:36 +02:00
42 changed files with 1426 additions and 255 deletions

View File

@@ -9,6 +9,9 @@ import "sync"
// ===================================================
type Config struct {
IsApi bool
IsNano bool
APIPort int
NATSUrl string
MongoUrl string
@@ -48,10 +51,13 @@ func GetConfig() *Config {
return instance
}
func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string, port int,
func SetConfig(isNano bool, isAPI bool, mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string, port int,
pkPath, ppPath,
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI, internalWorkspaceAPI,
internalPeerAPI, internalDatacenterAPI string, internalSchedulerAPI string) *Config {
GetConfig().IsNano = isNano
GetConfig().IsApi = isAPI
GetConfig().MongoUrl = mongoUrl
GetConfig().MongoDatabase = database
GetConfig().NATSUrl = natsUrl

View File

@@ -65,6 +65,8 @@ const (
NATIVE_TOOL = tools.NATIVE_TOOL
EXECUTION_VERIFICATION = tools.EXECUTION_VERIFICATION
ALLOWED_IMAGE = tools.ALLOWED_IMAGE
SERVICE_RESOURCE = tools.SERVICE_RESOURCE
LIVE_SERVICE = tools.LIVE_SERVICE
)
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
@@ -155,6 +157,9 @@ func InitDaemon(appName string) {
// resources.InitNative()
// feed the library with the loaded config
SetConfig(
o.GetBoolDefault("IS_NANO", false),
o.GetBoolDefault("IS_API", true),
o.GetStringDefault("MONGO_URL", "mongodb://127.0.0.1:27017"),
o.GetStringDefault("MONGO_DATABASE", "DC_myDC"),
o.GetStringDefault("NATS_URL", "nats://localhost:4222"),
@@ -172,11 +177,13 @@ func InitDaemon(appName string) {
o.GetStringDefault("INTERNAL_DATACENTER_API", "oc-datacenter"),
o.GetStringDefault("INTERNAL_SCHEDULER_API", "oc-scheduler"),
)
// Beego init
beego.BConfig.AppName = appName
beego.BConfig.Listen.HTTPPort = o.GetIntDefault("port", 8080)
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
if config.GetConfig().IsApi {
// Beego init
beego.BConfig.AppName = appName
beego.BConfig.Listen.HTTPPort = o.GetIntDefault("port", 8080)
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
}
}
type IDTokenClaims struct {
@@ -196,6 +203,53 @@ type Claims struct {
Session SessionClaims `json:"session"`
}
func GetExtends(objs []utils.ShallowDBObject, typ ...string) []map[string]interface{} {
cache := map[tools.DataType]map[string]interface{}{}
m := []map[string]interface{}{}
for _, obj := range objs {
m = append(m, GetExtend(obj, obj.Extend(typ...), cache))
}
return m
}
func GetExtend(obj utils.DBObject, extends map[string][]tools.DataType, cache map[tools.DataType]map[string]interface{}) map[string]interface{} {
base := obj.Serialize(obj)
for k, v := range extends {
if base[k+"_id"] == nil || base[k+"_id"] == "" {
fmt.Println(k+"_id", "GET EXTEND")
continue
}
for _, vv := range v {
if cache[vv] != nil && cache[vv][fmt.Sprintf("%v", base[k+"_id"])] != nil {
base[k] = cache[vv][fmt.Sprintf("%v", base[k+"_id"])]
continue
}
if d, _, err := models.Model(vv.EnumIndex()).GetAccessor(&tools.APIRequest{
Admin: true,
}).LoadOne(fmt.Sprintf("%v", base[k+"_id"])); d != nil && err == nil {
base[k] = d.Serialize(d)
if cache[vv] == nil {
cache[vv] = map[string]interface{}{}
}
if cache[vv][fmt.Sprintf("%v", base[k+"_id"])] == nil {
fmt.Println("TTT", vv, k, base[k])
cache[vv][fmt.Sprintf("%v", base[k+"_id"])] = base[k]
}
break
}
}
}
return base
}
func ExtractTokenInfoWs(request http.Request) (string, string, []string) {
reqToken := request.Header.Get("Sec-WebSocket-Protocol")
if reqToken != "" {
return extractFromToken(reqToken, "user_id"), extractFromToken(reqToken, "peer_id"), strings.Split(extractFromToken(reqToken, "groups"), ",")
}
return "", "", []string{}
}
func ExtractTokenInfo(request http.Request) (string, string, []string) {
reqToken := request.Header.Get("Authorization")
splitToken := strings.Split(reqToken, "Bearer ")
@@ -240,19 +294,22 @@ func extractFromToken(token string, attr string) string {
func InitAPI(appName string, extraRoutes ...map[string][]string) {
InitDaemon(appName)
beego.BConfig.Listen.HTTPPort = config.GetConfig().APIPort
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
c := cors.Allow(&cors.Options{
AllowAllOrigins: true,
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Origin", "Authorization", "Content-Type"},
ExposeHeaders: []string{"Content-Length", "Content-Type"},
AllowCredentials: true,
})
beego.InsertFilter("*", beego.BeforeRouter, c)
api := &tools.API{}
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo(), extraRoutes...)
if config.GetConfig().IsApi {
beego.BConfig.Listen.HTTPPort = config.GetConfig().APIPort
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
c := cors.Allow(&cors.Options{
AllowAllOrigins: true,
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Origin", "Authorization", "Content-Type"},
ExposeHeaders: []string{"Content-Length", "Content-Type"},
AllowCredentials: true,
})
beego.InsertFilter("*", beego.BeforeRouter, c)
api := &tools.API{}
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo(), extraRoutes...)
}
}
//
@@ -274,11 +331,11 @@ func GetLogger() zerolog.Logger {
* @param logLevel string
* @return *Config
*/
func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string,
func SetConfig(isNano bool, isApi bool, mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string,
port int, pppath string, pkpath string,
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI string, internalSchedulerAPI string) *config.Config {
cfg := config.SetConfig(mongoUrl, database, natsUrl, lokiUrl, logLevel, port, pkpath, pppath, internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
cfg := config.SetConfig(isNano, isApi, mongoUrl, database, natsUrl, lokiUrl, logLevel, port, pkpath, pppath, internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI, internalSchedulerAPI)
defer func() {
if r := recover(); r != nil {

20
go.mod
View File

@@ -7,6 +7,7 @@ require (
github.com/go-playground/validator/v10 v10.22.0
github.com/google/uuid v1.6.0
github.com/goraz/onion v0.1.3
github.com/libp2p/go-libp2p/core v0.43.0-rc2
github.com/nats-io/nats.go v1.37.0
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.11.1
@@ -22,18 +23,33 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.16.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.1 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.6.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/time v0.9.0 // indirect
@@ -42,6 +58,7 @@ require (
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
@@ -67,7 +84,6 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/libp2p/go-libp2p/core v0.43.0-rc2
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -89,3 +105,5 @@ require (
k8s.io/api v0.35.1
k8s.io/client-go v0.35.1
)
replace github.com/libp2p/go-libp2p/core => github.com/libp2p/go-libp2p v0.47.0

2
go.sum
View File

@@ -133,6 +133,8 @@ github.com/multiformats/go-multicodec v0.9.1 h1:x/Fuxr7ZuR4jJV4Os5g444F7xC4XmyUa
github.com/multiformats/go-multicodec v0.9.1/go.mod h1:LLWNMtyV5ithSBUo3vFIMaeDy+h3EbkMTek1m+Fybbo=
github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U=
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
github.com/multiformats/go-multistream v0.6.1 h1:4aoX5v6T+yWmc2raBHsTvzmFhOI8WVOer28DeBBEYdQ=
github.com/multiformats/go-multistream v0.6.1/go.mod h1:ksQf6kqHAb6zIsyw7Zm+gAuVo57Qbq84E27YlYqavqw=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=

View File

@@ -28,6 +28,20 @@ type Bill struct {
Total float64 `json:"total" bson:"total" validate:"required"`
}
func (ri *Bill) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "order":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.ORDER)
}
}
return ext
}
func GenerateBill(order *order.Order, request *tools.APIRequest) (*Bill, error) {
// hhmmm : should get... the loop.
return &Bill{

View File

@@ -13,8 +13,10 @@ import (
* Booking is a struct that represents a booking
*/
type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"`
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
ExecutionMetrics map[string][]models.MetricsSnapshot `json:"metrics,omitempty" bson:"metrics,omitempty"`

View File

@@ -8,16 +8,18 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
// InstanceCapacity holds the maximum available resources of a single resource instance.
type InstanceCapacity struct {
CPUCores map[string]float64 `json:"cpu_cores,omitempty"` // model -> total cores
GPUMemGB map[string]float64 `json:"gpu_mem_gb,omitempty"` // model -> total memory GB
RAMGB float64 `json:"ram_gb,omitempty"` // total RAM GB
StorageGB float64 `json:"storage_gb,omitempty"` // total storage GB
CPUCores map[string]float64 `json:"cpu_cores,omitempty"` // model -> total cores
GPUMemGB map[string]float64 `json:"gpu_mem_gb,omitempty"` // model -> total memory GB
RAMGB float64 `json:"ram_gb,omitempty"` // total RAM GB
StorageGB float64 `json:"storage_gb,omitempty"` // total storage GB
MaxConcurrent float64 `json:"max_concurrent,omitempty"` // HOSTED service: max simultaneous callers
}
// ResourceRequest describes the resource amounts needed for a prospective booking.
@@ -47,11 +49,14 @@ type PlannerITF interface {
}
// Planner is a volatile (non-persisted) object that organises bookings by resource.
// Only ComputeResource and StorageResource bookings appear in the schedule.
// ComputeResource, StorageResource and HOSTED ServiceResource bookings appear in the schedule.
// BlockedResources marks resources for which no matching Live was found at generation time:
// any availability check against a blocked resource returns false immediately.
type Planner struct {
GeneratedAt time.Time `json:"generated_at"`
Schedule map[string][]*PlannerSlot `json:"schedule"` // resource_id -> slots
Capacities map[string]map[string]*InstanceCapacity `json:"capacities"` // resource_id -> instance_id -> max capacity
GeneratedAt time.Time `json:"generated_at"`
Schedule map[string][]*PlannerSlot `json:"schedule"` // resource_id -> slots
Capacities map[string]map[string]*InstanceCapacity `json:"capacities"` // resource_id -> instance_id -> max capacity
BlockedResources map[string]bool `json:"blocked_resources,omitempty"` // resource_id -> no Live found
}
// Generate builds a full Planner from all active bookings.
@@ -86,9 +91,10 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
bookings := append(confirmed, drafts...)
p := &Planner{
GeneratedAt: time.Now(),
Schedule: map[string][]*PlannerSlot{},
Capacities: map[string]map[string]*InstanceCapacity{},
GeneratedAt: time.Now(),
Schedule: map[string][]*PlannerSlot{},
Capacities: map[string]map[string]*InstanceCapacity{},
BlockedResources: map[string]bool{},
}
for _, b := range bookings {
@@ -100,8 +106,10 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
continue
}
// Only compute and storage resources are eligible
if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE {
// Eligible resource types: compute, storage, and HOSTED services.
if bk.ResourceType != tools.COMPUTE_RESOURCE &&
bk.ResourceType != tools.STORAGE_RESOURCE &&
bk.ResourceType != tools.SERVICE_RESOURCE {
continue
}
@@ -111,7 +119,11 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
end = &e
}
instanceID, usage, cap := extractSlotData(bk, request)
instanceID, usage, cap, blocked := extractSlotData(bk, request)
if blocked {
p.BlockedResources[bk.ResourceID] = true
continue
}
if instanceID == "" {
instanceID = bk.InstanceID
}
@@ -151,6 +163,9 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
// Slots targeting other instances are ignored.
// If no capacity is known for this instance (never booked), it is fully available.
func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool {
if p.BlockedResources[resourceID] {
return false // no Live found at generation time — cannot book
}
if end == nil {
e := start.Add(5 * time.Minute)
end = &e
@@ -265,6 +280,11 @@ func toPercentages(req *ResourceRequest, cap *InstanceCapacity) map[string]float
pct["storage"] = (*req.StorageGB / cap.StorageGB) * 100.0
}
// HOSTED service: each booking consumes one call slot.
if cap.MaxConcurrent > 0 {
pct["calls"] = (1.0 / cap.MaxConcurrent) * 100.0
}
return pct
}
@@ -272,9 +292,11 @@ func toPercentages(req *ResourceRequest, cap *InstanceCapacity) map[string]float
// Internal helpers
// ---------------------------------------------------------------------------
// extractSlotData parses the booking's PricedItem, loads the corresponding resource,
// and returns the instance ID, usage percentages, and instance capacity in a single pass.
func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) {
// extractSlotData parses the booking's PricedItem, loads the corresponding Live resource
// as the authoritative capacity source, and returns the instance ID, usage percentages,
// capacity, and whether a matching Live was found.
// blocked=true means no Live exists for this resource; the resource must not be scheduled.
func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity, blocked bool) {
usage = map[string]float64{}
if len(bk.PricedItem) == 0 {
return
@@ -289,6 +311,8 @@ func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID
instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request)
case tools.STORAGE_RESOURCE:
instanceID, usage, cap = extractStorageSlot(b, bk.ResourceID, request)
case tools.SERVICE_RESOURCE:
instanceID, usage, cap, blocked = extractServiceSlot(b, bk.ResourceID, request)
}
return
}
@@ -381,6 +405,51 @@ func extractStorageSlot(pricedJSON []byte, resourceID string, request *tools.API
return
}
// extractServiceSlot extracts the instance ID, usage, and capacity for a HOSTED service booking.
// The LiveService is the authoritative source for MaxConcurrent — the ServiceResource is not trusted.
// If no LiveService references this resourceID, blocked=true signals the resource cannot be scheduled.
func extractServiceSlot(pricedJSON []byte, resourceID string, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity, blocked bool) {
usage = map[string]float64{}
var priced resources.PricedServiceResource
if err := json.Unmarshal(pricedJSON, &priced); err != nil {
blocked = true
return
}
// LiveService is the authoritative capacity source — look it up by resources_id.
liveResults, _, err := (&live.LiveService{}).GetAccessor(request).Search(
&dbs.Filters{
And: map[string][]dbs.Filter{
"resources_id": {{Operator: dbs.EQUAL.String(), Value: resourceID}},
},
}, "*", false, 0, 1)
if err != nil || len(liveResults) == 0 {
blocked = true // no Live → cannot schedule
return
}
ls := liveResults[0].(*live.LiveService)
if ls.MaxConcurrent <= 0 {
blocked = true
return
}
// Instance ID: use the first instance referenced by the priced item.
instanceID = priced.GetID()
if instanceID == "" {
instanceID = resourceID // fallback: treat the resource itself as the instance key
}
maxC := float64(ls.MaxConcurrent)
cap = &InstanceCapacity{
CPUCores: map[string]float64{},
GPUMemGB: map[string]float64{},
MaxConcurrent: maxC,
}
usage["calls"] = (1.0 / maxC) * 100.0
return
}
// findComputeInstance returns the instance referenced by the priced item's InstancesRefs,
// falling back to the first available instance.
func findComputeInstance(compute *resources.ComputeResource, refs map[string]string) *resources.ComputeResourceInstance {

View File

@@ -27,16 +27,18 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[stri
return near
}
// GetPlannerLongestTime returns the sum of all processing durations (conservative estimate).
// Returns -1 if any processing is a service (open-ended).
// GetPlannerLongestTime returns the sum of all processing+service durations.
// Returns -1 if any item is open-ended (no deadline).
func GetPlannerLongestTime(planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 {
longestTime := float64(0)
for _, priced := range planned[tools.PROCESSING_RESOURCE] {
d := priced.GetExplicitDurationInS()
if d < 0 {
return -1 // service present: booking is open-ended
for _, dt := range []tools.DataType{tools.PROCESSING_RESOURCE, tools.SERVICE_RESOURCE} {
for _, priced := range planned[dt] {
d := priced.GetExplicitDurationInS()
if d < 0 {
return -1
}
longestTime += d
}
longestTime += d
}
return longestTime
}

View File

@@ -18,6 +18,20 @@ type ExecutionVerification struct {
Validate bool `json:"validate" bson:"validate,omitempty"`
}
func (ri *ExecutionVerification) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "wokflow":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW)
}
}
return ext
}
func (r *ExecutionVerification) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY
}

View File

@@ -1,18 +1,13 @@
package live
import (
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type LiveInterface interface {
utils.DBObject
IsCompatible(service map[string]interface{}) bool
GetMonitorPath() string
GetResourcesID() []string
SetResourcesID(string)
GetResourceAccessor(request *tools.APIRequest) utils.Accessor
GetResource() resources.ResourceInterface
GetResourceInstance() resources.ResourceInstanceITF
SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface
}

View File

@@ -3,7 +3,6 @@ package live
import (
"slices"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
@@ -32,18 +31,41 @@ type LiveCerts struct {
}
// TODO in the future multiple type of certs depending of infra type
type GeoPoint struct {
Latitude float64 `json:"latitude,omitempty" bson:"latitude,omitempty"`
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
}
type AbstractLive struct {
utils.AbstractObject
Certs LiveCerts `json:"certs,omitempty" bson:"certs,omitempty"`
MonitorPath string `json:"monitor_path,omitempty" bson:"monitor_path,omitempty"`
Location resources.GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
ResourcesID []string `json:"resources_id" bson:"resources_id"`
}
func (ri *AbstractLive) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "resource":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW_RESOURCE)
ext[t] = append(ext[t], tools.DATA_RESOURCE)
ext[t] = append(ext[t], tools.COMPUTE_RESOURCE)
ext[t] = append(ext[t], tools.STORAGE_RESOURCE)
ext[t] = append(ext[t], tools.PROCESSING_RESOURCE)
ext[t] = append(ext[t], tools.SERVICE_RESOURCE)
}
}
return ext
}
func (d *AbstractLive) GetMonitorPath() string {
return d.MonitorPath
}
@@ -52,9 +74,9 @@ func (d *AbstractLive) GetResourcesID() []string {
return d.ResourcesID
}
func (d *AbstractLive) SetResourcesID(id string) {
if !slices.Contains(d.ResourcesID, id) {
d.ResourcesID = append(d.ResourcesID, id)
func (d *AbstractLive) SetResourcesID(resourcesid string) {
if slices.Contains(d.ResourcesID, resourcesid) {
d.ResourcesID = append(d.ResourcesID, resourcesid)
}
}

View File

@@ -3,7 +3,6 @@ package live
import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -12,39 +11,34 @@ import (
* LiveDatacenter is a struct that represents a compute units in your datacenters
*/
type ComputeNode struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Quantity int64 `json:"quantity" bson:"quantity" default:"1"`
RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM
CPUs map[string]int64 `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]int64 `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
}
type LiveDatacenter struct {
AbstractLive
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
Nodes []*resources.ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
}
func (r *LiveDatacenter) IsCompatible(service map[string]interface{}) bool {
return service["infrastructure"] == r.Infrastructure && service["architecture"] == r.Architecture
}
func (d *LiveDatacenter) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveDatacenter](tools.LIVE_DATACENTER, request) // Create a new instance of the accessor
}
func (d *LiveDatacenter) GetResourceAccessor(request *tools.APIRequest) utils.Accessor {
return resources.NewAccessor[*resources.ComputeResource](tools.COMPUTE_RESOURCE, request)
}
func (d *LiveDatacenter) GetResource() resources.ResourceInterface {
return &resources.ComputeResource{}
}
func (d *LiveDatacenter) GetResourceInstance() resources.ResourceInstanceITF {
return &resources.ComputeResourceInstance{}
}
func (d *LiveDatacenter) SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface {
r := res.(*resources.ComputeResource)
r.Instances = append(r.Instances, i.(*resources.ComputeResourceInstance))
return r
}

View File

@@ -1,9 +1,6 @@
package live
import (
"encoding/json"
"errors"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
@@ -27,70 +24,16 @@ func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *
return &LiveDatacenter{}
case tools.LIVE_STORAGE:
return &LiveStorage{}
case tools.LIVE_SERVICE:
return &LiveService{}
}
return &LiveDatacenter{}
},
NotImplemented: []string{"CopyOne"},
},
}
}
/*
* Nothing special here, just the basic CRUD operations
*/
func (a *liveMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
// is a publisher... that become a resources.
if data.IsDrafted() {
return nil, 422, errors.New("can't publish a drafted compute units")
}
live := data.(T)
/*if live.GetMonitorPath() == "" || live.GetID() != "" {
return nil, 422, errors.New("publishing is only allowed is it can be monitored and be accessible")
}*/
if res, code, err := a.LoadOne(live.GetID()); err != nil {
return nil, code, err
} else {
live = res.(T)
}
resAccess := live.GetResourceAccessor(a.Request)
instance := live.GetResourceInstance()
b, _ := json.Marshal(live)
json.Unmarshal(b, instance)
if len(live.GetResourcesID()) > 0 {
for _, r := range live.GetResourcesID() {
res, code, err := resAccess.LoadOne(r)
if err == nil {
return nil, code, err
}
existingResource := live.GetResource()
b, _ := json.Marshal(res)
json.Unmarshal(b, existingResource)
live.SetResourceInstance(existingResource, instance)
resAccess.UpdateOne(existingResource.Serialize(existingResource), existingResource.GetID())
}
if live.GetID() != "" {
return a.LoadOne(live.GetID())
} else {
return a.StoreOne(live)
}
} else {
r := live.GetResource()
b, _ := json.Marshal(live)
json.Unmarshal(b, &r)
live.SetResourceInstance(r, instance)
res, code, err := utils.GenericStoreOne(r, resAccess)
if err != nil {
return nil, code, err
}
live.SetResourcesID(res.GetID())
if live.GetID() != "" {
return a.UpdateOne(live.Serialize(live), live.GetID())
} else {
return a.StoreOne(live)
}
}
}
func (wfa *liveMongoAccessor[T]) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[T](wfa.GetExec(isDraft), isDraft, wfa, offset, limit)
}

View File

@@ -0,0 +1,42 @@
package live
import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type ServiceProtocol int
const (
HTTP ServiceProtocol = iota
GRPC
WEBSOCKET
TCP
)
func (p ServiceProtocol) String() string {
return [...]string{"HTTP", "GRPC", "WEBSOCKET", "TCP"}[p]
}
// LiveService is the authoritative description of a hosted service run by the peer.
// MaxConcurrent is the only capacity dimension that matters for scheduling:
// it caps the number of simultaneous callers the service can accept.
// All other service metadata (endpoint, protocol) is live-verified here
// rather than trusted from the ServiceResource, which may be stale.
type LiveService struct {
AbstractLive
MaxConcurrent int `json:"max_concurrent" bson:"max_concurrent"`
Protocol ServiceProtocol `json:"protocol" bson:"protocol" default:"0"`
EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"`
HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
}
func (d *LiveService) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveService](tools.LIVE_SERVICE, request)
}
func (r *LiveService) IsCompatible(service map[string]interface{}) bool {
return service["infrastructure"] == r.Infrastructure
}

View File

@@ -2,7 +2,6 @@ package live
import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -14,6 +13,7 @@ import (
type LiveStorage struct {
AbstractLive
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"`
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
Path string `bson:"path,omitempty" json:"path,omitempty"` // Path is the store folders in the source
Local bool `bson:"local" json:"local"`
@@ -25,22 +25,10 @@ type LiveStorage struct {
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
}
func (r *LiveStorage) IsCompatible(service map[string]interface{}) bool {
return service["storage_type"] == r.StorageType
}
func (d *LiveStorage) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveStorage](tools.LIVE_STORAGE, request) // Create a new instance of the accessor
}
func (d *LiveStorage) GetResourceAccessor(request *tools.APIRequest) utils.Accessor {
return resources.NewAccessor[*resources.StorageResource](tools.STORAGE_RESOURCE, request)
}
func (d *LiveStorage) GetResource() resources.ResourceInterface {
return &resources.StorageResource{}
}
func (d *LiveStorage) GetResourceInstance() resources.ResourceInstanceITF {
return &resources.StorageResourceInstance{}
}
func (d *LiveStorage) SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface {
r := res.(*resources.StorageResource)
r.Instances = append(r.Instances, i.(*resources.StorageResourceInstance))
return r
}

View File

@@ -79,18 +79,6 @@ func TestLiveDatacenter_GetAccessor_NilRequest(t *testing.T) {
assert.NotNil(t, acc)
}
func TestLiveDatacenter_GetResource(t *testing.T) {
dc := &live.LiveDatacenter{}
res := dc.GetResource()
assert.NotNil(t, res)
}
func TestLiveDatacenter_GetResourceInstance(t *testing.T) {
dc := &live.LiveDatacenter{}
inst := dc.GetResourceInstance()
assert.NotNil(t, inst)
}
func TestLiveDatacenter_IDAndName(t *testing.T) {
dc := &live.LiveDatacenter{}
dc.AbstractLive.AbstractObject = utils.AbstractObject{UUID: "dc-id", Name: "dc-name"}
@@ -124,18 +112,6 @@ func TestLiveStorage_GetAccessor(t *testing.T) {
assert.NotNil(t, acc)
}
func TestLiveStorage_GetResource(t *testing.T) {
s := &live.LiveStorage{}
res := s.GetResource()
assert.NotNil(t, res)
}
func TestLiveStorage_GetResourceInstance(t *testing.T) {
s := &live.LiveStorage{}
inst := s.GetResourceInstance()
assert.NotNil(t, inst)
}
func TestLiveStorage_SetResourcesID_NoDuplication(t *testing.T) {
s := &live.LiveStorage{}
s.SetResourcesID("storage-1")

View File

@@ -8,6 +8,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"cloud.o-forge.io/core/oc-lib/models/booking"
@@ -15,7 +16,6 @@ import (
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/rules/rule"
"cloud.o-forge.io/core/oc-lib/models/peer"
resource "cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
w2 "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
w3 "cloud.o-forge.io/core/oc-lib/models/workspace"
@@ -31,6 +31,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &resource.ComputeResource{} },
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.SERVICE_RESOURCE.String(): func() utils.DBObject { return &resource.ServiceResource{} },
tools.NATIVE_TOOL.String(): func() utils.DBObject { return &resource.NativeTool{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
@@ -45,6 +46,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.PURCHASE_RESOURCE.String(): func() utils.DBObject { return &purchase_resource.PurchaseResource{} },
tools.LIVE_DATACENTER.String(): func() utils.DBObject { return &live.LiveDatacenter{} },
tools.LIVE_STORAGE.String(): func() utils.DBObject { return &live.LiveStorage{} },
tools.LIVE_SERVICE.String(): func() utils.DBObject { return &live.LiveService{} },
tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },

View File

@@ -17,10 +17,10 @@ import (
type Order struct {
utils.AbstractObject
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
Billing map[pricing.BillingStrategy][]*booking.Booking `json:"billing" bson:"billing"`
}

View File

@@ -7,6 +7,15 @@ import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
)
type PeerPerm int
const (
READ PeerRelation = iota
WRITE
MONITOR
)
type PeerRelation int
@@ -17,9 +26,13 @@ const (
PARTNER
BLACKLIST
PENDING_PARTNER
MASTER
NANO
PENDING_NANO
PENDING_MASTER
)
var path = []string{"unknown", "self", "partner", "blacklist", "partner"}
var path = []string{"unknown", "self", "partner", "blacklist", "partner", "pending_partner", "master", "nano", "pending_nano", "pending_master"}
func GetRelationPath(str string) int {
for i, p := range path {
@@ -66,11 +79,20 @@ type PeerLocation struct {
Latitude float64 `json:"latitude" bson:"latitude"`
Longitude float64 `json:"longitude" bson:"longitude"`
Granularity int `json:"granularity" bson:"granularity"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
Timezone string `json:"timezone,omitempty" bson:"timezone,omitempty"`
}
// Peer is a struct that represents a peer
type Peer struct {
utils.AbstractObject
IsNano bool `json:"is_nano" bson:"is_nano"`
PeerPerms []PeerPerm `json:"peer_perms" bson:"peer_perms"`
RelationLastChangeDate time.Time `json:"relation_last_change_date" bson:"relation_last_change_date"`
RelationLastChangeUser string `json:"relation_last_change_user" bson:"relation_last_change_user"`
Verify bool `json:"verify" bson:"verify"`
OrganizationID string `json:"organization_id" bson:"organization_id"`
@@ -92,6 +114,26 @@ type Peer struct {
TrustScore float64 `json:"trust_score" bson:"trust_score" default:"100"`
BlacklistReason string `json:"blacklist_reason,omitempty" bson:"blacklist_reason,omitempty"`
BehaviorWarnings []BehaviorWarning `json:"behavior_warnings,omitempty" bson:"behavior_warnings,omitempty"`
// Volatile connectivity state — never persisted to DB (bson:"-").
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
Online bool `json:"online" bson:"-"`
LastHeartbeat *time.Time `json:"last_heartbeat,omitempty" bson:"-"`
}
func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "peer":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
}
}
return ext
}
func (ao *Peer) VerifyAuth(callName string, request *tools.APIRequest) bool {

View File

@@ -9,6 +9,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
@@ -46,23 +47,18 @@ func (abs *ComputeResource) ConvertToPricedResource(t tools.DataType, selectedIn
}, nil
}
type ComputeNode struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Quantity int64 `json:"quantity" bson:"quantity" default:"1"`
RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM
CPUs map[string]int64 `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]int64 `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
}
type ComputeResourceInstance struct {
ResourceInstance[*ComputeResourcePartnership]
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
Source string `json:"source,omitempty" bson:"source,omitempty"`
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"`
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"`
Nodes []*live.ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
// AvailableStorages lists storage capabilities activatable on this compute unit (e.g. Minio, local volumes).
// These are shallow StorageResource entries — not independent catalog items — but carry full pricing structure.
AvailableStorages []*StorageResource `json:"available_storages,omitempty" bson:"available_storages,omitempty"`
}
// IsPeerless is always false for compute instances: a compute resource is

298
models/resources/dynamic.go Executable file
View File

@@ -0,0 +1,298 @@
package resources
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"slices"
"strings"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* DynamicResource is a struct that represents a data resource
* it defines the resource data
*/
type DynamicResource struct {
AbstractResource
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
Filters map[string]interface{} `bson:"filters,omitempty" json:"filters,omitempty"`
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"`
ResourceIds map[int]string `bson:"resource_ids,omitempty" json:"resource_ids,omitempty"`
SelectedIndex int `bson:"selected_index,omitempty" json:"selected_index,omitempty"`
SelectedPartnershipIndex *int `bson:"selected_partnership_index,omitempty" json:"selected_partnership_index,omitempty"`
SelectedBuyingStrategy int `bson:"selected_buying_strategy" json:"selected_buying_strategy,omitempty"`
SelectedPricingStrategy int `bson:"selected_pricing_strategy" json:"selected_pricing_strategy,omitempty"`
Instances []ResourceInstanceITF `bson:"instances,omitempty" json:"instances,omitempty"`
WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"`
}
func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return nil
}
func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
for k, v := range map[tools.DataType]ResourceInterface{
tools.COMPUTE_RESOURCE: &ComputeResource{},
tools.DATA_RESOURCE: &DataResource{},
tools.STORAGE_RESOURCE: &StorageResource{},
tools.PROCESSING_RESOURCE: &ProcessingResource{},
tools.WORKFLOW_RESOURCE: &WorkflowResource{}} {
if d.Type != k {
continue
}
access := NewAccessor[*DynamicResource](k, request)
a, _, _ := access.Search(dbs.FiltersFromFlatMap(d.Filters, v), "", false, 0, 100000)
d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{}
for _, res := range a {
for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i)
}
}
break
}
sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances)
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
d.SortRules["partnerships"] = "%v not contains 2"
return d.compareByRules(a, b, d.SortRules)
})
d.WatchedDynamicResource = []string{}
return d.Instances
}
func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) {
d.Instances = append(d.Instances, instance)
}
func (d *DynamicResource) GetSelectedInstance(index *int) ResourceInstanceITF {
if len(d.Instances) == 0 {
return nil
}
for i, inst := range d.Instances {
if slices.Contains(d.WatchedDynamicResource, inst.GetID()) {
continue
}
d.WatchedDynamicResource = append(d.WatchedDynamicResource, inst.GetID())
d.SelectedIndex = i
for i := range inst.GetPartnerships() {
if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil {
d.SelectedPartnershipIndex = &i
break
}
}
if d.SelectedPartnershipIndex == nil {
continue
}
return inst
}
return nil
}
// compareByRules orders instances so those satisfying more sort rules come first.
// When both satisfy a rule, the one with the lower first-attribute value wins (ASC strict).
// Key format: "attrA" for single-%s rules, "attrA,attrB" for two-%s rules.
func (ri *DynamicResource) compareByRules(a, b ResourceInstanceITF, rules map[string]string) int {
ma := a.Serialize(a)
mb := b.Serialize(b)
for attrs, rule := range rules {
attrPaths := strings.Split(attrs, ",")
aOk, aFirst := ri.ruleMatchesAny(rule, attrPaths, ma)
bOk, bFirst := ri.ruleMatchesAny(rule, attrPaths, mb)
if aOk && !bOk {
return -1
}
if !aOk && bOk {
return 1
}
if aOk && bOk {
if aFirst < bFirst {
return -1
}
if aFirst > bFirst {
return 1
}
}
}
return 0
}
// ruleMatchesAny checks if any value (or combination for 2-%s rules) satisfies rule.
// Arrays at any path level are iterated. Returns (matched, firstMatchingValue).
func (ri *DynamicResource) ruleMatchesAny(rule string, attrPaths []string, m map[string]interface{}) (bool, string) {
placeholders := strings.Count(rule, "%s")
if placeholders == 0 {
return false, ""
}
valsA := ri.getVals(strings.Split(strings.TrimSpace(attrPaths[0]), "."), m)
if placeholders == 1 {
for _, v := range valsA {
if ri.byRules(rule, v) {
return true, fmt.Sprintf("%v", v)
}
}
return false, ""
}
if len(attrPaths) < 2 {
return false, ""
}
valsB := ri.getVals(strings.Split(strings.TrimSpace(attrPaths[1]), "."), m)
for _, a := range valsA {
for _, b := range valsB {
if ri.byRules(rule, a, b) {
return true, fmt.Sprintf("%v", a)
}
}
}
return false, ""
}
// getVals navigates attrs into m, collecting all leaf values.
// At each level it detects whether the value is a dict (map) or an array and acts accordingly:
// - array of maps → recurse into each element with the remaining path
// - array of scalars (leaf) → collect all as strings
// - map → recurse with the remaining path
func (ri *DynamicResource) getVals(attrs []string, m map[string]interface{}) []interface{} {
if len(attrs) == 0 {
return nil
}
attr := attrs[0]
if attr == "" || m[attr] == nil {
return nil
}
b, err := json.Marshal(m[attr])
if err != nil {
return nil
}
// Leaf level: detect array vs scalar.
if len(attrs) == 1 {
var arr []interface{}
if err := json.Unmarshal(b, &arr); err == nil {
results := []interface{}{}
for _, v := range arr {
results = append(results, fmt.Sprintf("%v", v))
}
return results
}
return []interface{}{m[attr]}
}
// Intermediate level: detect array of maps vs single map.
var arrMaps []map[string]interface{}
if err := json.Unmarshal(b, &arrMaps); err == nil {
results := []interface{}{}
for _, item := range arrMaps {
results = append(results, ri.getVals(attrs[1:], item)...)
}
return results
}
nm := map[string]interface{}{}
if err := json.Unmarshal(b, &nm); err != nil {
return nil
}
return ri.getVals(attrs[1:], nm)
}
func (ri *DynamicResource) byRules(rule string, vals ...interface{}) bool {
if len(vals) == 0 {
return false
}
formatted := fmt.Sprintf(rule, vals...)
// hm hm
switch {
case strings.Contains(rule, "not contains"):
a := strings.Split(formatted, " not contains ")
if reflect.TypeOf(vals[0]).Kind() == reflect.Map {
return vals[0].(map[string]interface{})[fmt.Sprintf("%v", a[1])] != nil
}
return strings.Contains(a[0], a[1])
case strings.Contains(rule, "contains"):
a := strings.Split(formatted, " contains ")
if reflect.TypeOf(vals[0]).Kind() == reflect.Map {
return vals[0].(map[string]interface{})[fmt.Sprintf("%v", a[1])] != nil
}
return strings.Contains(a[0], a[1])
case strings.Contains(rule, "<="):
a := strings.Split(formatted, " <= ")
return len(a) > 1 && a[0] <= a[1]
case strings.Contains(rule, ">="):
a := strings.Split(formatted, " >= ")
return len(a) > 1 && a[0] >= a[1]
case strings.Contains(rule, "<>"), strings.Contains(rule, "not like"):
if strings.Contains(rule, "<>") {
a := strings.Split(formatted, " <> ")
return len(a) > 1 && !strings.Contains(a[0], a[1]) && !strings.Contains(a[1], a[0])
}
a := strings.Split(formatted, " not like ")
return len(a) > 1 && !strings.Contains(a[0], a[1]) && !strings.Contains(a[1], a[0])
case strings.Contains(rule, "<"):
a := strings.Split(formatted, " < ")
return len(a) > 1 && a[0] < a[1]
case strings.Contains(rule, ">"):
a := strings.Split(formatted, " > ")
return len(a) > 1 && a[0] > a[1]
case strings.Contains(rule, "=="):
a := strings.Split(formatted, " == ")
return len(a) > 1 && a[0] == a[1]
case strings.Contains(rule, "!="):
a := strings.Split(formatted, " != ")
return len(a) > 1 && a[0] != a[1]
case strings.Contains(rule, "like"):
a := strings.Split(formatted, " like ")
return len(a) > 1 && (strings.Contains(a[0], a[1]) || strings.Contains(a[1], a[0]))
}
return false
}
func (r *DynamicResource) GetType() string {
return tools.DYNAMIC_RESOURCE.String()
}
func (abs *DynamicResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
var p pricing.PricedItemITF
var err error
for _, v := range []tools.DataType{
tools.COMPUTE_RESOURCE,
tools.DATA_RESOURCE,
tools.STORAGE_RESOURCE,
tools.PROCESSING_RESOURCE,
tools.WORKFLOW_RESOURCE,
} {
switch v {
case tools.COMPUTE_RESOURCE:
if p, err = ConvertToPricedResource[*ComputeResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil {
return p.(*PricedResource[*ProcessingResourcePricingProfile]), nil
}
case tools.DATA_RESOURCE:
if p, err = ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil {
return p.(*PricedResource[*DataResourcePricingProfile]), nil
}
case tools.STORAGE_RESOURCE:
if p, err = ConvertToPricedResource[*StorageResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil {
return p.(*PricedResource[*StorageResourcePricingProfile]), nil
}
case tools.PROCESSING_RESOURCE:
if p, err = ConvertToPricedResource[*ProcessingResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil {
return p.(*PricedResource[*ProcessingResourcePricingProfile]), nil
}
}
}
return nil, errors.New("can't convert priced resource")
}

View File

@@ -42,6 +42,7 @@ type ResourceInstanceITF interface {
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups()
GetPartnerships() []ResourcePartnerITF
GetAverageDurationS() float64
UpdateAverageDuration(actualS float64)
}

View File

@@ -12,13 +12,18 @@ type ResourceSet struct {
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
// DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances.
DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
ProcessingResources []*ProcessingResource `bson:"-" json:"processing_resources,omitempty"`
ComputeResources []*ComputeResource `bson:"-" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"-" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"-" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
}
func (r *ResourceSet) Clear() {
@@ -27,6 +32,8 @@ func (r *ResourceSet) Clear() {
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
r.DynamicResources = nil
}
func (r *ResourceSet) Fill(request *tools.APIRequest) {
@@ -37,6 +44,8 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) {
(&StorageResource{}): r.Storages,
(&ProcessingResource{}): r.Processings,
(&WorkflowResource{}): r.Workflows,
(&ServiceResource{}): r.Services,
(&DynamicResource{}): r.Dynamics,
} {
for _, id := range v {
d, _, e := k.GetAccessor(request).LoadOne(id)
@@ -52,10 +61,17 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
case *WorkflowResource:
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
case *ServiceResource:
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
case *DynamicResource:
r.DynamicResources = append(r.DynamicResources, d.(*DynamicResource))
}
}
}
}
for _, d := range r.DynamicResources {
d.SetAllowedInstances(request)
}
}
type ItemResource struct {
@@ -65,4 +81,6 @@ type ItemResource struct {
Compute *ComputeResource `bson:"compute,omitempty" json:"compute,omitempty"`
Workflow *WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"`
NativeTool *NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
Service *ServiceResource `bson:"service,omitempty" json:"service,omitempty"`
Dynamic *DynamicResource `bson:"dynamic,omitempty" json:"dynamic,omitempty"`
}

View File

@@ -28,10 +28,8 @@ type ProcessingUsage struct {
*/
type ProcessingResource struct {
AbstractInstanciatedResource[*ProcessingInstance]
IsEvent bool `json:"is_event,omitempty" bson:"is_event,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"`
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"`
OpenSource bool `json:"open_source" bson:"open_source" default:"false"`
License string `json:"license,omitempty" bson:"license,omitempty"`
Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"`
@@ -69,7 +67,6 @@ type ProcessingResourcePartnership struct {
type PricedProcessingResource struct {
PricedResource[*ProcessingResourcePricingProfile]
IsService bool
}
func (r *PricedProcessingResource) ensurePricing() {
@@ -102,10 +99,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
a.BookingConfiguration = &BookingConfiguration{}
}
if a.BookingConfiguration.ExplicitBookingDurationS == 0 {
if a.IsService || a.BookingConfiguration.UsageStart == nil {
if a.IsService {
return -1
}
if a.BookingConfiguration.UsageStart == nil {
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()

View File

@@ -9,6 +9,7 @@ import (
type PurchaseResource struct {
utils.AbstractObject
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"`
DestPeerID string `json:"dest_peer_id" bson:"dest_peer_id"`
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty" validate:"required"`
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions
@@ -24,6 +25,34 @@ type PurchaseResource struct {
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
}
func (ri *PurchaseResource) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "dest_peer", "scheduler_peer":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
case "execution":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW_EXECUTION)
case "resource":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW_RESOURCE)
ext[t] = append(ext[t], tools.DATA_RESOURCE)
ext[t] = append(ext[t], tools.COMPUTE_RESOURCE)
ext[t] = append(ext[t], tools.STORAGE_RESOURCE)
ext[t] = append(ext[t], tools.PROCESSING_RESOURCE)
ext[t] = append(ext[t], tools.SERVICE_RESOURCE)
}
}
return ext
}
func (d *PurchaseResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}

View File

@@ -30,7 +30,7 @@ func NewAccessor(request *tools.APIRequest) *PurchaseResourceMongoAccessor {
func (a *PurchaseResourceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
if d.(*PurchaseResource).EndDate != nil && time.Now().UTC().After(*d.(*PurchaseResource).EndDate) {
utils.GenericDeleteOne(id, a)
utils.GenericDelete(d, a)
return nil, 404, nil
}
return d, 200, nil
@@ -40,9 +40,13 @@ func (a *PurchaseResourceMongoAccessor) LoadOne(id string) (utils.DBObject, int,
func (a *PurchaseResourceMongoAccessor) GetExec(isDraft bool) func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
if d.(*PurchaseResource).EndDate != nil && time.Now().UTC().After(*d.(*PurchaseResource).EndDate) {
utils.GenericDeleteOne(d.GetID(), a)
utils.GenericDelete(d, a)
return nil
}
return d
}
}
func (dca *PurchaseResourceMongoAccessor) ShouldVerifyAuth() bool {
return false // TEMP : by pass
}

View File

@@ -11,6 +11,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
@@ -19,10 +20,14 @@ import (
"github.com/google/uuid"
)
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
return dbs.FiltersFromFlatMap(flatMap, target)
}
// AbstractResource is the struct containing all of the attributes commons to all ressources
type AbstractResource struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PurchaseInfo *purchase_resource.PurchaseResource `json:"purchase_info,omitempty"` // is_buy precise if a resource is buy or not
PurchaseID string `json:"purchase_id,omitempty"` // is_buy precise if a resource is buy or not
Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the resource
Logo string `json:"logo,omitempty" bson:"logo,omitempty"` // Logo is the logo of the resource
Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource
@@ -36,6 +41,20 @@ type AbstractResource struct {
Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"`
}
func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType {
dt := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "purchase":
if _, ok := dt[t]; !ok {
dt[t] = []tools.DataType{}
}
dt[t] = append(dt[t], tools.PURCHASE_RESOURCE)
}
}
return dt
}
func (abs *AbstractResource) VerifyBuy() {
p := &purchase_resource.PurchaseResource{}
access := p.GetAccessor(&tools.APIRequest{Admin: true})
@@ -46,7 +65,7 @@ func (abs *AbstractResource) VerifyBuy() {
},
}, "", false, 0, 1)
if len(purchase) > 0 {
abs.PurchaseInfo = purchase[0].(*purchase_resource.PurchaseResource)
abs.PurchaseID = purchase[0].GetID()
}
}
@@ -232,17 +251,12 @@ func VerifyAuthAction[T ResourceInstanceITF](baseInstance []T, request *tools.AP
return instances
}
type GeoPoint struct {
Latitude float64 `json:"latitude,omitempty" bson:"latitude,omitempty"`
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
}
type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject
ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"`
LastUpdate time.Time `json:"last_update,omitempty" bson:"last_update,omitempty"`
Origin OriginMeta `json:"origin,omitempty" bson:"origin,omitempty"`
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Location live.GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
@@ -367,6 +381,14 @@ func (ri *ResourceInstance[T]) UpdateAverageDuration(actualS float64) {
ri.AverageDurationSamples++
}
func (ri *ResourceInstance[T]) GetPartnerships() []ResourcePartnerITF {
rt := []ResourcePartnerITF{}
for _, p := range ri.Partnerships {
rt = append(rt, p)
}
return rt
}
type ResourcePartnerShip[T pricing.PricingProfileITF] struct {
Namespace string `json:"namespace" bson:"namespace" default:"default-namespace"`
PeerGroups map[string][]string `json:"peer_groups,omitempty" bson:"peer_groups,omitempty"`
@@ -497,6 +519,12 @@ func ToResource(
return nil, err
}
return &data, nil
case tools.SERVICE_RESOURCE.EnumIndex():
var data ServiceResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
}
return nil, errors.New("can't found any data resources matching")
}

View File

@@ -2,10 +2,12 @@ package resources
import (
"errors"
"fmt"
"slices"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -18,8 +20,8 @@ type ResourceMongoAccessor[T ResourceInterface] struct {
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] {
if !slices.Contains([]tools.DataType{
tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE,
tools.PROCESSING_RESOURCE, tools.WORKFLOW_RESOURCE,
tools.DATA_RESOURCE, tools.NATIVE_TOOL,
tools.PROCESSING_RESOURCE, tools.SERVICE_RESOURCE,
tools.WORKFLOW_RESOURCE, tools.DATA_RESOURCE, tools.NATIVE_TOOL,
}, t) {
return nil
}
@@ -36,6 +38,8 @@ func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIReques
return &StorageResource{}
case tools.PROCESSING_RESOURCE:
return &ProcessingResource{}
case tools.SERVICE_RESOURCE:
return &ServiceResource{}
case tools.WORKFLOW_RESOURCE:
return &WorkflowResource{}
case tools.DATA_RESOURCE:
@@ -65,7 +69,12 @@ func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, er
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE {
return nil, 404, errors.New("can't update a non existing computing units resource not reported onto compute units catalog")
delete(set, "architecture")
delete(set, "infrastructure")
} else if dca.GetType() == tools.SERVICE_RESOURCE {
delete(set, "infrastructure")
} else if dca.GetType() == tools.STORAGE_RESOURCE {
delete(set, "storage_type")
}
return utils.GenericUpdateOne(set, id, dca)
}
@@ -75,16 +84,66 @@ func (dca *ResourceMongoAccessor[T]) ShouldVerifyAuth() bool {
}
func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
var i string
idsToUpdate := []string{}
var a utils.Accessor
if dca.GetType() == tools.COMPUTE_RESOURCE {
return nil, 404, errors.New("can't create a non existing computing units resource not reported onto compute units catalog")
r := data.(*ComputeResource)
if len(r.Instances) == 0 {
return nil, 404, errors.New("can't create a non existing computing units resource with no instances")
}
a = live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil {
return nil, 404, errors.New("can't create a non existing computing units resource not reported onto compute units catalog")
}
if !res.(*live.LiveDatacenter).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live computing units target is not compatible")
}
i = res.GetID()
idsToUpdate = res.(*live.LiveDatacenter).ResourcesID
} else if dca.GetType() == tools.SERVICE_RESOURCE {
r := data.(*ServiceResource)
if len(r.Instances) == 0 {
return nil, 404, errors.New("can't create a non existing service resource with no instances")
}
a = live.NewAccessor[*live.LiveService](tools.LIVE_SERVICE, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil {
return nil, 404, errors.New("can't create a non existing service resource not reported onto compute units catalog")
}
if !res.(*live.LiveService).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live service target is not compatible")
}
i = res.GetID()
idsToUpdate = res.(*live.LiveService).ResourcesID
} else if dca.GetType() == tools.STORAGE_RESOURCE {
r := data.(*StorageResource)
if len(r.Instances) == 0 {
return nil, 404, errors.New("can't create a non existing storage resource with no instances")
}
a = live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil {
return nil, 404, errors.New("can't create a non existing storage resource not reported onto compute units catalog")
}
if !res.(*live.LiveStorage).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live storage target is not compatible")
}
i = res.GetID()
idsToUpdate = res.(*live.LiveStorage).ResourcesID
}
return utils.GenericStoreOne(data, dca)
res, code, err := utils.GenericStoreOne(data, dca)
if res != nil && i != "" {
idsToUpdate = append(idsToUpdate, res.GetID())
a.UpdateOne(map[string]interface{}{
"resources_id": idsToUpdate,
}, i)
}
return res, code, err
}
func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE {
return nil, 404, errors.New("can't copy/publish a non existing computing units resource not reported onto compute units catalog")
}
return dca.StoreOne(data)
}
@@ -95,8 +154,10 @@ func (wfa *ResourceMongoAccessor[T]) LoadAll(isDraft bool, offset int64, limit i
func (wfa *ResourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
if filters == nil && search == "*" {
return utils.GenericLoadAll[T](func(d utils.DBObject) utils.ShallowDBObject {
fmt.Println("Search", d)
d.(T).VerifyBuy()
d.(T).SetAllowedInstances(wfa.Request)
fmt.Println("Search2", d)
return d
}, isDraft, wfa, offset, limit)
}

198
models/resources/service.go Executable file
View File

@@ -0,0 +1,198 @@
package resources
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
)
type ServiceMode int
const (
DEPLOYMENT ServiceMode = iota // deploy the service, pay for uptime — duration unbounded
HOSTED // use an existing service, pay per call — duration per request
)
func (m ServiceMode) String() string {
return [...]string{"DEPLOYMENT", "HOSTED"}[m]
}
type ServiceUsage struct {
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"`
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"`
RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"`
StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"`
Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"`
ScalingModel string `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"`
}
// ServiceResourceAccess describes how to reach the service once running.
// Populated for HOSTED instances (endpoint already known) and as a template for DEPLOYMENT.
type ServiceResourceAccess struct {
Container *models.Container `json:"container,omitempty" bson:"container,omitempty"`
Protocol live.ServiceProtocol `json:"protocol" bson:"protocol" default:"0"`
EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"`
HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"`
}
type ServiceResource struct {
AbstractInstanciatedResource[*ServiceInstance]
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"`
Usage *ServiceUsage `bson:"usage,omitempty" json:"usage,omitempty"`
OpenSource bool `json:"open_source" bson:"open_source" default:"false"`
License string `json:"license,omitempty" bson:"license,omitempty"`
Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"`
}
func (r *ServiceResource) GetType() string {
return tools.SERVICE_RESOURCE.String()
}
func (d *ServiceResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*ServiceResource](tools.SERVICE_RESOURCE, request)
}
func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.SERVICE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Service")
}
p, err := ConvertToPricedResource[*ServiceResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource[*ServiceResourcePricingProfile])
return &PricedServiceResource{PricedResource: *priced}, nil
}
type ServiceInstance struct {
ResourceInstance[*ServiceResourcePartnership]
Mode ServiceMode `json:"mode" bson:"mode" default:"0"`
Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"`
MaxConcurrent int `json:"max_concurrent,omitempty" bson:"max_concurrent,omitempty"`
}
func (ri *ServiceInstance) IsPeerless() bool { return false }
func NewServiceInstance(name string, peerID string) ResourceInstanceITF {
return &ServiceInstance{
ResourceInstance: ResourceInstance[*ServiceResourcePartnership]{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: name,
},
},
}
}
type ServiceResourcePartnership struct {
ResourcePartnerShip[*ServiceResourcePricingProfile]
}
// ServiceResourcePricingProfile handles both service modes:
// - DEPLOYMENT: uptime billing via ExploitPricingProfile (pay while service is up)
// - HOSTED: per-call billing via AccessPricingProfile (pay per request)
type ServiceResourcePricingProfile struct {
Mode ServiceMode `json:"mode" bson:"mode"`
UptimePricing *pricing.ExploitPricingProfile[pricing.TimePricingStrategy] `json:"uptime_pricing,omitempty" bson:"uptime_pricing,omitempty"`
AccessPricing *pricing.AccessPricingProfile[pricing.TimePricingStrategy] `json:"access_pricing,omitempty" bson:"access_pricing,omitempty"`
}
func (p *ServiceResourcePricingProfile) ensure() {
if p.UptimePricing == nil {
p.UptimePricing = &pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{}
}
if p.AccessPricing == nil {
p.AccessPricing = &pricing.AccessPricingProfile[pricing.TimePricingStrategy]{}
}
}
func (p *ServiceResourcePricingProfile) IsPurchasable() bool {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.IsPurchasable()
}
return p.AccessPricing.IsPurchasable()
}
func (p *ServiceResourcePricingProfile) IsBooked() bool {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.IsBooked()
}
return p.AccessPricing.IsBooked()
}
func (p *ServiceResourcePricingProfile) GetPurchase() pricing.BuyingStrategy {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.GetPurchase()
}
return p.AccessPricing.GetPurchase()
}
func (p *ServiceResourcePricingProfile) GetOverrideStrategyValue() int {
return -1
}
func (p *ServiceResourcePricingProfile) GetPriceHT(quantity float64, val float64, start time.Time, end time.Time, variations []*pricing.PricingVariation, params ...string) (float64, error) {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.GetPriceHT(quantity, val, start, end, variations, params...)
}
return p.AccessPricing.GetPriceHT(quantity, val, start, end, variations, params...)
}
type PricedServiceResource struct {
PricedResource[*ServiceResourcePricingProfile]
}
func (r *PricedServiceResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &ServiceResourcePricingProfile{}
}
}
func (r *PricedServiceResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedServiceResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedServiceResource) GetType() tools.DataType {
return tools.SERVICE_RESOURCE
}
func (r *PricedServiceResource) GetPriceHT() (float64, error) {
r.ensurePricing()
return r.PricedResource.GetPriceHT()
}
// GetExplicitDurationInS returns -1 for DEPLOYMENT (unbounded uptime).
// For HOSTED, returns the actual call window duration.
func (a *PricedServiceResource) GetExplicitDurationInS() float64 {
a.ensurePricing()
if a.SelectedPricing.Mode == DEPLOYMENT {
return -1
}
if a.BookingConfiguration == nil {
a.BookingConfiguration = &BookingConfiguration{}
}
if a.BookingConfiguration.ExplicitBookingDurationS != 0 {
return a.BookingConfiguration.ExplicitBookingDurationS
}
if a.BookingConfiguration.UsageStart == nil || a.BookingConfiguration.UsageEnd == nil {
return -1 // no deadline specified: open-ended
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
}

View File

@@ -12,6 +12,17 @@ import (
"github.com/google/uuid"
)
// EmbeddedStorageSelection records which storage capability was activated on a
// compute unit graph node, and which pricing options were selected for it.
// Key in WorkflowExecution.SelectedEmbeddedStorages is the compute graph node ID.
// A nil/absent entry means no storage was activated on that compute unit.
type EmbeddedStorageSelection struct {
StorageIndex int `json:"storage_index" bson:"storage_index"` // index in ComputeResourceInstance.AvailableStorages
PartnershipIndex int `json:"partnership_index" bson:"partnership_index"` // index in the storage's partnerships
BuyingIndex int `json:"buying_index" bson:"buying_index"`
StrategyIndex int `json:"strategy_index" bson:"strategy_index"`
}
/*
* StorageResource is a struct that represents a storage resource
* it defines the resource storage

View File

@@ -30,13 +30,6 @@ func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
input PricedProcessingResource
expected float64
}{
{
name: "Service without explicit duration",
input: PricedProcessingResource{
IsService: true,
},
expected: -1,
},
{
name: "Nil start time, non-service",
input: PricedProcessingResource{

View File

@@ -44,6 +44,20 @@ type AbstractObject struct {
Signature []byte `bson:"signature,omitempty" json:"signature,omitempty"`
}
func (ri *AbstractObject) Extend(typ ...string) map[string][]tools.DataType {
dt := map[string][]tools.DataType{}
for _, t := range typ {
switch t {
case "creator", "user_creator", "user_updater":
if _, ok := dt[t]; !ok {
dt[t] = []tools.DataType{}
}
dt[t] = append(dt[t], tools.PEER)
}
}
return dt
}
func (ri *AbstractObject) GetAccessor(request *tools.APIRequest) Accessor {
return nil
}

View File

@@ -17,6 +17,9 @@ type Owner struct {
}
func VerifyAccess(a Accessor, id string) error {
if a == nil {
return errors.New("no accessor to verify access")
}
data, _, err := a.LoadOne(id)
if err != nil {
return err
@@ -79,18 +82,22 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
if res == nil {
return res, code, errors.New("not found")
}
return GenericDelete(res, a)
}
func GenericDelete(res DBObject, a Accessor) (DBObject, int, error) {
if !res.CanDelete() {
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
}
if a.ShouldVerifyAuth() && !res.VerifyAuth("delete", a.GetRequest()) {
return nil, 403, errors.New("you are not allowed to access " + a.GetType().String())
}
_, code, err = mongo.MONGOService.DeleteOne(id, a.GetType().String())
_, code, err := mongo.MONGOService.DeleteOne(res.GetID(), a.GetType().String())
if err != nil {
a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error())
a.GetLogger().Error().Msg("Could not delete " + res.GetID() + " to db. Error: " + err.Error())
return nil, code, err
}
go NotifyChange(a.GetType(), id, res, true)
go NotifyChange(a.GetType(), res.GetID(), res, true)
return res, 200, nil
}
@@ -162,7 +169,6 @@ func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, i
if err = res_mongo.Decode(data); err != nil {
return nil, 400, err
}
if a.ShouldVerifyAuth() && !data.VerifyAuth("get", a.GetRequest()) {
return nil, 403, errors.New("you are not allowed to access :" + a.GetType().String())
}

View File

@@ -8,6 +8,7 @@ import (
// ShallowDBObject is an interface that defines the basic methods shallowed version of a DBObject
type ShallowDBObject interface {
DBObject
GenerateID()
GetID() string
GetName() string
@@ -18,6 +19,7 @@ type ShallowDBObject interface {
// DBObject is an interface that defines the basic methods for a DBObject
type DBObject interface {
GenerateID()
Extend(typ ...string) map[string][]tools.DataType
SetNotInCatalog(bool)
IsNotInCatalog() bool
SetID(id string)

View File

@@ -45,6 +45,10 @@ func (wf *Graph) IsProcessing(item GraphItem) bool {
return item.Processing != nil
}
func (wf *Graph) IsService(item GraphItem) bool {
return item.Service != nil
}
func (wf *Graph) IsNativeTool(item GraphItem) bool {
return item.NativeTool != nil
}
@@ -65,6 +69,10 @@ func (wf *Graph) IsWorkflow(item GraphItem) bool {
return item.Workflow != nil
}
func (wf *Graph) IsDynamic(item GraphItem) bool {
return item.Dynamic != nil
}
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(processings []*resources.ProcessingResource, resource resources.ResourceInterface,
f func(GraphItem) resources.ResourceInterface, instance int, partnership int, buying int, strategy int, bookingMode int, request *tools.APIRequest) (float64, float64, error) {
oneIsInfinite := false
@@ -151,6 +159,8 @@ func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterf
return tools.PROCESSING_RESOURCE, item.Processing
} else if item.Storage != nil {
return tools.STORAGE_RESOURCE, item.Storage
} else if item.Service != nil {
return tools.SERVICE_RESOURCE, item.Service
}
}
return tools.INVALID, nil

View File

@@ -27,6 +27,10 @@ func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface)
return tools.STORAGE_RESOURCE, g.Storage
} else if g.NativeTool != nil {
return tools.NATIVE_TOOL, g.NativeTool
} else if g.Service != nil {
return tools.SERVICE_RESOURCE, g.Service
} else if g.Dynamic != nil {
return tools.DYNAMIC_RESOURCE, g.Dynamic
}
return tools.INVALID, nil
}
@@ -37,4 +41,6 @@ func (g *GraphItem) Clear() {
g.Workflow = nil
g.Processing = nil
g.Storage = nil
g.Service = nil
g.Dynamic = nil
}

View File

@@ -47,11 +47,12 @@ type Workflow struct {
// Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow // AbstractWorkflow contains the basic fields of a workflow
Env map[string][]models.Param `json:"env" bson:"env"`
Inputs map[string][]models.Param `json:"inputs" bson:"inputs"`
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
Env map[string][]models.Param `json:"env" bson:"env"`
Inputs map[string][]models.Param `json:"inputs" bson:"inputs"`
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty"`
}
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -91,7 +92,18 @@ func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface
itf = append(itf, d)
}
return itf
case tools.SERVICE_RESOURCE:
for _, d := range d.ServiceResources {
itf = append(itf, d)
}
return itf
case tools.DYNAMIC_RESOURCE:
for _, d := range d.DynamicResources {
itf = append(itf, d)
}
return itf
}
return itf
}
@@ -107,12 +119,16 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
d.Processings = []string{}
d.Computes = []string{}
d.Workflows = []string{}
d.Dynamics = []string{}
d.Services = []string{}
d.DataResources = []*resources.DataResource{}
d.StorageResources = []*resources.StorageResource{}
d.ProcessingResources = []*resources.ProcessingResource{}
d.ComputeResources = []*resources.ComputeResource{}
d.WorkflowResources = []*resources.WorkflowResource{}
d.DynamicResources = []*resources.DynamicResource{}
d.ServiceResources = []*resources.ServiceResource{}
d.Graph = graph.NewGraph()
resourceCatalog := map[string]func() resources.ResourceInterface{
@@ -144,6 +160,16 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
},
}
},
"Service": func() resources.ResourceInterface {
return &resources.ServiceResource{
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.ServiceInstance]{
Instances: []*resources.ServiceInstance{},
},
}
},
"Dynamic": func() resources.ResourceInterface {
return &resources.DynamicResource{}
},
// WorkflowEvent creates a NativeTool of Kind=WORKFLOW_EVENT directly,
// without DB lookup. It has no user-defined instance.
"WorkflowEvent": func() resources.ResourceInterface {
@@ -229,9 +255,12 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
}
d.generateResource(d.GetResources(tools.DATA_RESOURCE), request)
d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.DYNAMIC_RESOURCE), request)
d.Graph.Items = graphVarName
return d, nil
}
@@ -410,6 +439,14 @@ func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceI
d.Processings = append(d.Processings, resource.GetID())
d.ProcessingResources = append(d.ProcessingResources, resource.(*resources.ProcessingResource))
graphItem.Processing = resource.(*resources.ProcessingResource)
case "Service":
d.Services = append(d.Services, resource.GetID())
d.ServiceResources = append(d.ServiceResources, resource.(*resources.ServiceResource))
graphItem.Service = resource.(*resources.ServiceResource)
case "Dynamic":
d.Dynamics = append(d.Dynamics, resource.GetID())
d.DynamicResources = append(d.DynamicResources, resource.(*resources.DynamicResource))
graphItem.Dynamic = resource.(*resources.DynamicResource)
case "WorkflowEvent":
// The resource is already a *NativeTool with Kind=WORKFLOW_EVENT set by the
// catalog factory. We use it directly without any DB lookup.
@@ -441,6 +478,8 @@ func (d *Workflow) getNewInstance(dataName string, name string, peerID string) r
return resources.NewStorageResourceInstance(name, peerID)
case "ComputeUnit":
return resources.NewComputeResourceInstance(name, peerID)
case "Service":
return resources.NewServiceInstance(name, peerID)
default:
return nil
}
@@ -627,7 +666,8 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
var err error
// 2. Plan processings first so we can derive the total workflow duration.
// 2. Plan processings and services first so we can derive the total workflow duration.
// Services in DEPLOYMENT mode return duration=-1 (open-ended); HOSTED mode returns a bounded call window.
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
@@ -644,6 +684,24 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
if err != nil {
return false, 0, priceds, nil, err
}
if _, priceds, err = plan[*resources.ServiceResource](tools.SERVICE_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsService,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
*instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()),
bookingMode, request)
if err != nil {
return start, 0, err
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
if duration < 0 {
return nil, nil // DEPLOYMENT mode: open-ended
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
@@ -793,7 +851,9 @@ func (w *Workflow) GetItemsByResources() map[tools.DataType]map[string][]string
tools.DATA_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsData) },
tools.COMPUTE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsCompute) },
tools.PROCESSING_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsProcessing) },
tools.SERVICE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsService) },
tools.WORKFLOW_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsWorkflow) },
tools.DYNAMIC_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsDynamic) },
}
for dt, meth := range dtMethodMap {

View File

@@ -219,6 +219,10 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
access = resources.NewAccessor[*resources.WorkflowResource](t, a.GetRequest())
case tools.DATA_RESOURCE:
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest())
case tools.NATIVE_TOOL:
access = resources.NewAccessor[*resources.NativeTool](t, a.GetRequest())
case tools.SERVICE_RESOURCE:
access = resources.NewAccessor[*resources.ServiceResource](t, a.GetRequest())
default:
wf.Graph.Clear(resource.GetID())
}

View File

@@ -0,0 +1,168 @@
package workflow_execution
import (
"slices"
"time"
workflowgraph "cloud.o-forge.io/core/oc-lib/models/workflow/graph"
)
// ExecutionStepState is the runtime state of a single step in the execution graph.
type ExecutionStepState string
const (
StepWaiting ExecutionStepState = "waiting"
StepRunning ExecutionStepState = "running"
StepSuccess ExecutionStepState = "success"
StepFailure ExecutionStepState = "failure"
)
// ExecutionGraphItem is the summarized view of one node in the workflow execution graph.
//
// - Name : human-readable label (resource name or item ID as fallback)
// - StartDate : set when the step transitions to StepRunning
// - EndDate : set when the step transitions to StepSuccess or StepFailure
// - State : current lifecycle state of the step
// - Deps : itemIDs that must reach StepSuccess before this step can start
// - WhenRunning : itemIDs (resources) that become active while this step is running
// (e.g. the compute node executing it, the storage it reads/writes)
type ExecutionGraphItem struct {
Name string `json:"name" bson:"name"`
StartDate *time.Time `json:"start_date,omitempty" bson:"start_date,omitempty"`
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"`
State ExecutionStepState `json:"state" bson:"state"`
Deps []string `json:"deps,omitempty" bson:"deps,omitempty"`
WhenRunning []string `json:"when_running,omitempty" bson:"when_running,omitempty"`
}
// ExecutionGraph is a flat, scheduler-friendly summary of a workflow execution graph.
// The map key is the workflow graph item ID.
type ExecutionGraph map[string]ExecutionGraphItem
// BuildExecutionGraph derives an initial ExecutionGraph (all steps in StepWaiting)
// from a workflow graph. It infers:
// - Deps : predecessor item IDs based on link direction
// - WhenRunning : sibling item IDs connected to a step by a link
// (i.e. resources that are co-active when the step runs)
func BuildExecutionGraph(g *workflowgraph.Graph) ExecutionGraph {
if g == nil {
return ExecutionGraph{}
}
// deps[dst] = list of src item IDs that dst depends on
deps := map[string][]string{}
// whenRunning[id] = list of item IDs active while id is running
whenRunning := map[string][]string{}
for _, link := range g.Links {
src := link.Source.ID
dst := link.Destination.ID
if src == "" || dst == "" {
continue
}
srcItem, srcOk := g.Items[src]
dstItem, dstOk := g.Items[dst]
if !srcOk || !dstOk {
continue
}
// Steps (logical nodes that sequence execution): Data, Processing, Workflow, NativeTool.
// Resources (infrastructure co-active while a step runs): Compute, Storage.
srcIsStep := srcItem.Data != nil || srcItem.Processing != nil || srcItem.Workflow != nil || srcItem.NativeTool != nil
dstIsStep := dstItem.Data != nil || dstItem.Processing != nil || dstItem.Workflow != nil || dstItem.NativeTool != nil
srcIsResource := srcItem.Compute != nil || srcItem.Storage != nil
dstIsResource := dstItem.Compute != nil || dstItem.Storage != nil
switch {
case srcIsStep && dstIsStep:
// Sequential dependency: dst must wait for src to succeed.
deps[dst] = appendUnique(deps[dst], src)
case srcIsStep && dstIsResource:
// src activates dst (compute/storage) while running.
whenRunning[src] = appendUnique(whenRunning[src], dst)
case srcIsResource && dstIsStep:
// dst uses src (compute/storage) while running.
whenRunning[dst] = appendUnique(whenRunning[dst], src)
}
}
eg := ExecutionGraph{}
for id, item := range g.Items {
name := id
_, r := item.GetResource()
if r != nil && r.GetName() != "" {
name = r.GetName()
}
eg[id] = ExecutionGraphItem{
Name: name,
State: StepWaiting,
Deps: deps[id],
WhenRunning: whenRunning[id],
}
}
return eg
}
// MarkRunning transitions the step to StepRunning and records the start time.
// It is a no-op if the step is already beyond StepRunning.
func (eg ExecutionGraph) MarkRunning(itemID string, at time.Time) {
item, ok := eg[itemID]
if !ok || item.State == StepSuccess || item.State == StepFailure {
return
}
item.State = StepRunning
item.StartDate = &at
eg[itemID] = item
}
// MarkDone transitions the step to StepSuccess or StepFailure and records the end time.
func (eg ExecutionGraph) MarkDone(itemID string, success bool, at time.Time) {
item, ok := eg[itemID]
if !ok {
return
}
if success {
item.State = StepSuccess
} else {
item.State = StepFailure
}
item.EndDate = &at
eg[itemID] = item
}
// Depssatisfied returns true when all deps of the given item have reached StepSuccess.
func (eg ExecutionGraph) Depssatisfied(itemID string) bool {
item, ok := eg[itemID]
if !ok {
return false
}
for _, dep := range item.Deps {
depItem, depOk := eg[dep]
if !depOk || depItem.State != StepSuccess {
return false
}
}
return true
}
// ReadyToRun returns the IDs of all steps that are still waiting and whose deps
// are fully satisfied. Useful for the scheduler to decide what to start next.
func (eg ExecutionGraph) ReadyToRun() []string {
ready := []string{}
for id, item := range eg {
if item.State == StepWaiting && eg.Depssatisfied(id) {
ready = append(ready, id)
}
}
return ready
}
func appendUnique(slice []string, val string) []string {
if slices.Contains(slice, val) {
return slice
}
return append(slice, val)
}

View File

@@ -9,6 +9,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
@@ -47,10 +48,34 @@ type WorkflowExecution struct {
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
// Graph is a lightweight, real-time summary of the workflow execution graph.
// Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event.
// Consumed by oc-front to render the live execution panel via websocket updates.
Graph ExecutionGraph `json:"graph,omitempty" bson:"graph,omitempty"`
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
// SelectedEmbeddedStorages records which storage capability was activated on
// each compute unit graph node (key = compute graph node ID).
// Populated by oc-scheduler, consumed by oc-monitord's argo builder.
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
}
func (ri *WorkflowExecution) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "workflow":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
}
}
return ext
}
func (r *WorkflowExecution) StoreDraftDefault() {
@@ -141,6 +166,7 @@ use of a datacenter or storage can't be buy for permanent access.
func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*purchase_resource.PurchaseResource {
purchases := d.buyEach(bs, executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])
purchases = append(purchases, d.buyEach(bs, executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
purchases = append(purchases, d.buyEach(bs, executionsID, wfID, tools.SERVICE_RESOURCE, priceds[tools.SERVICE_RESOURCE])...)
d.PurchasesState = map[string]bool{}
for _, p := range purchases {
d.PurchasesState[p.GetID()] = false
@@ -170,7 +196,11 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
var m map[string]interface{}
b, _ := json.Marshal(priced)
json.Unmarshal(b, &m)
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
var endDate *time.Time
if durS := priced.GetExplicitDurationInS(); durS > 0 {
e := start.Add(time.Duration(durS) * time.Second)
endDate = &e
}
bookingItem := &purchase_resource.PurchaseResource{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
@@ -184,7 +214,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
ResourceID: priced.GetID(),
InstanceID: priced.GetInstanceID(),
ResourceType: dt,
EndDate: &end,
EndDate: endDate,
}
items = append(items, bookingItem)
d.PeerBuyByGraph[priced.GetCreatorID()][itemID] = append(
@@ -196,6 +226,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking {
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
booking = append(booking, d.bookEach(executionsID, wfID, tools.SERVICE_RESOURCE, priceds[tools.SERVICE_RESOURCE])...)
booking = append(booking, d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
booking = append(booking, d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
for _, p := range booking {

View File

@@ -33,6 +33,9 @@ const (
NATIVE_TOOL
EXECUTION_VERIFICATION
ALLOWED_IMAGE
SERVICE_RESOURCE
DYNAMIC_RESOURCE
LIVE_SERVICE
)
var NOAPI = func() string {
@@ -90,6 +93,9 @@ var InnerDefaultAPI = [...]func() string{
CATALOGAPI,
SCHEDULERAPI,
DATACENTERAPI,
CATALOGAPI,
CATALOGAPI,
DATACENTERAPI,
}
// Bind the standard data name to the data type
@@ -117,6 +123,9 @@ var Str = [...]string{
"native_tool",
"execution_verification",
"allowed_image",
"service_resource",
"dynamic_resource",
"live_service",
}
func FromString(comp string) int {
@@ -152,7 +161,7 @@ func DataTypeList() []DataType {
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE,
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE}
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE}
}
type PropalgationMessage struct {
@@ -177,6 +186,11 @@ const (
PB_PVC_CONFIG
PB_CLOSE_SEARCH
NONE
PB_OBSERVE
PB_OBSERVE_CLOSE
// PB_PROPAGATE is used by oc-discovery to broadcast a peer's online/offline
// state to other oc-discovery nodes in the federation via PROPALGATION_EVENT.
PB_PROPAGATE
)
func GetActionString(ss string) PubSubAction {
@@ -203,14 +217,40 @@ func GetActionString(ss string) PubSubAction {
return PB_MINIO_CONFIG
case "close_search":
return PB_CLOSE_SEARCH
case "observe":
return PB_OBSERVE
case "observe_close":
return PB_OBSERVE_CLOSE
case "propagate":
return PB_PROPAGATE
default:
return NONE
}
}
var path = []string{"search", "search_response", "create", "update", "delete", "planner", "close_planner",
"considers", "admiralty_config", "minio_config", "close_search"}
// path aligns with PubSubAction iota values for String().
var path = []string{
"search", // 0 PB_SEARCH
"search_response", // 1 PB_SEARCH_RESPONSE
"create", // 2 PB_CREATE
"update", // 3 PB_UPDATE
"delete", // 4 PB_DELETE
"planner", // 5 PB_PLANNER
"close_planner", // 6 PB_CLOSE_PLANNER
"considers", // 7 PB_CONSIDERS
"admiralty_config", // 8 PB_ADMIRALTY_CONFIG
"minio_config", // 9 PB_MINIO_CONFIG
"pvc_config", // 10 PB_PVC_CONFIG
"close_search", // 11 PB_CLOSE_SEARCH
"none", // 12 NONE
"observe", // 13 PB_OBSERVE
"observe_close", // 14 PB_OBSERVE_CLOSE
"propagate", // 15 PB_PROPAGATE
}
func (m PubSubAction) String() string {
if int(m) >= len(path) {
return "unknown"
}
return strings.ToUpper(path[m])
}

View File

@@ -31,7 +31,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event",
"peer behavior event",
"peer behavior event", "peer observe response event", "peer observe event",
}
const (
@@ -68,6 +68,16 @@ const (
// oc-discovery consumes it to update the peer's trust score and auto-blacklist
// below threshold.
PEER_BEHAVIOR_EVENT
// PEER_OBSERVE_RESPONSE_EVENT is emitted by oc-discovery each time it receives
// a heartbeat from an observed remote peer. oc-peer listens to this event to
// update the WS connectivity state for its clients.
PEER_OBSERVE_RESPONSE_EVENT
// PEER_OBSERVE_EVENT is emitted by oc-peer to request oc-discovery to start
// or stop observing a remote peer. Payload contains the target peer_id and a
// boolean close flag.
PEER_OBSERVE_EVENT
)
func (n NATSMethod) String() string {
@@ -79,7 +89,8 @@ func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}