66 Commits

Author SHA1 Message Date
mr
d9b1ad8dde ToMaster 2026-04-23 11:40:13 +02:00
mr
d6106dacde FromNano 2026-04-23 11:36:39 +02:00
mr
365a1d670c from_nano for booking 2026-04-23 11:19:23 +02:00
mr
25880077d1 pending_master 2026-04-23 11:08:02 +02:00
mr
560c997bf1 pending nano for nano flow. 2026-04-23 10:33:51 +02:00
mr
747368c79a nano 2026-04-23 10:16:13 +02:00
mr
e5e5706834 master role 2026-04-23 10:11:41 +02:00
mr
b9ad5d5ea7 is_nano 2026-04-23 10:04:12 +02:00
mr
e70e89b630 Api Struct + Nano env 2026-04-23 09:48:39 +02:00
mr
9c2663601a Service + Storage Binded to Compute 2026-04-23 09:24:02 +02:00
mr
538496cd60 debug cache 2026-04-22 14:13:28 +02:00
mr
a4366d3a09 follow purchase 2026-04-22 11:54:16 +02:00
mr
51e2dcc404 Load One pb 2026-04-22 11:47:08 +02:00
mr
c208e2ccef sub delete for loop 2026-04-22 11:38:21 +02:00
mr
5cda4fdd40 missed placed 2026-04-22 11:25:32 +02:00
mr
b92634ccba debug extend resource 2026-04-22 11:15:39 +02:00
mr
da237b1d26 oclib 2026-04-22 10:55:52 +02:00
mr
94e3ebbdd9 temp by pass purchase 2026-04-22 10:24:06 +02:00
mr
6741e929cc purchase as string 2026-04-22 09:48:16 +02:00
mr
a08c9b084d GetExtends adjust 2026-04-22 09:18:08 +02:00
mr
17a45eb5d1 GetExtends 2026-04-22 09:14:21 +02:00
mr
0c6efee276 Kick extend treatment 2026-04-21 14:45:04 +02:00
mr
bbaea4fec4 extended for load all + search all 2026-04-21 14:36:52 +02:00
mr
d57ee0b5e7 Extend for Human Readable 2026-04-21 14:30:45 +02:00
mr
50a5e90f33 Native access debug 2026-04-21 08:16:04 +02:00
mr
5cc04ee490 oc-lib 2026-04-17 09:45:00 +02:00
mr
883c0bec3d graph 2026-04-16 15:19:36 +02:00
mr
dc0041999d Change bus 2026-04-14 12:46:22 +02:00
mr
a653f9495b lock caller 2026-04-13 15:08:06 +02:00
mr
d7b2ef6ae1 Prep Status 2026-04-10 09:57:51 +02:00
mr
878885c8c8 pricing profile payment mode + workflow 2026-04-09 16:14:44 +02:00
mr
c340146c8d naming 2026-04-09 08:54:42 +02:00
mr
92eb2663bc add purchase info 2026-04-08 16:34:21 +02:00
mr
284533ad1d Simplify & Live Bug 2026-04-08 15:40:44 +02:00
mr
dbbad79480 Resource Buy & Limitation 2026-04-08 15:18:20 +02:00
mr
046bde17d4 format Date for horrible date name 2026-04-08 15:09:32 +02:00
mr
6fe91eda87 NATIVE_TOOL 2026-04-07 11:09:27 +02:00
mr
526eaef33a could not load 2026-04-07 11:03:36 +02:00
mr
b7ee6d8e7f kick canDelete 2026-04-07 09:54:48 +02:00
mr
5dbe55e630 StoreDraftDefault skip 2026-04-07 09:36:31 +02:00
mr
2e9f4cb9f4 can delete + search 2026-04-07 08:32:42 +02:00
mr
3ad0a69f54 default on serialization 2026-04-03 17:34:43 +02:00
mr
2a6d3880cd useless print 2026-04-03 16:54:20 +02:00
mr
316ebc93f9 change 2026-04-03 16:37:39 +02:00
mr
913d9b3dfb oclib then 2026-04-03 14:18:07 +02:00
mr
450e917227 PeerGrouping defaulting on access all 2026-04-03 10:36:48 +02:00
mr
54985bbc45 array missing 2026-04-02 14:55:06 +02:00
mr
4f0714cb11 Get ENV INPUTS OUTPUT 2026-04-02 14:45:51 +02:00
mr
a2f6f3c252 ENV, Input, Outpu Expose, Container change of rules 2026-04-02 14:31:19 +02:00
mr
2bc4555793 entrypoint 2026-04-02 10:01:26 +02:00
mr
ad12f02a70 Rights Behaviors 2026-04-02 09:43:04 +02:00
mr
20cac09f9d Add SetNotInCatalog 2026-04-01 13:04:47 +02:00
mr
f3b5a54545 location 2026-03-31 20:19:01 +02:00
mr
c0722483b8 IsNot in catalog strategy 2026-03-31 16:41:12 +02:00
mr
0aee593f29 Not in catalog strategy 2026-03-31 16:40:30 +02:00
mr
a4ab3285e3 Add attr inspired by docker 2026-03-30 10:21:09 +02:00
mr
45f2351b2f OC LIB -> EXTRA 2026-03-27 12:41:31 +01:00
mr
39cb1c715c debug filter on catalog 2026-03-27 12:14:15 +01:00
mr
87cf2cb12a Booking State 2026-03-26 12:02:03 +01:00
mr
4580200e80 Allowed_image 2026-03-25 10:20:16 +01:00
mr
6d0c78946e Peerless + New Argo 2026-03-24 12:49:37 +01:00
mr
211339947c kubernetes + podchaperon 2026-03-23 16:20:20 +01:00
mr
b76b22a8fb Pv + Pvc for admiralty purpose 2026-03-23 12:29:35 +01:00
mr
fa9893e150 pvc immediate 2026-03-23 12:16:29 +01:00
mr
14b449f547 Fusion + Nats Complement 2026-03-23 11:53:21 +01:00
mr
5b197c91e0 Add CreatePVC and DeletePVC to KubernetesService
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 11:42:58 +01:00
62 changed files with 1760 additions and 328 deletions

View File

@@ -9,6 +9,9 @@ import "sync"
// ===================================================
type Config struct {
IsApi bool
IsNano bool
APIPort int
NATSUrl string
MongoUrl string
@@ -48,10 +51,13 @@ func GetConfig() *Config {
return instance
}
func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string, port int,
func SetConfig(isNano bool, isAPI bool, mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string, port int,
pkPath, ppPath,
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI, internalWorkspaceAPI,
internalPeerAPI, internalDatacenterAPI string, internalSchedulerAPI string) *Config {
GetConfig().IsNano = isNano
GetConfig().IsApi = isAPI
GetConfig().MongoUrl = mongoUrl
GetConfig().MongoDatabase = database
GetConfig().NATSUrl = natsUrl

View File

@@ -2,6 +2,7 @@ package dbs
import (
"fmt"
"reflect"
"runtime/debug"
"strings"
@@ -102,12 +103,12 @@ func GetBson(filters *Filters) bson.D {
}
}
if len(orList) > 0 && len(andList) == 0 {
f = bson.D{{"$or", orList}}
f = bson.D{{Key: "$or", Value: orList}}
} else {
if len(orList) > 0 {
andList = append(andList, bson.M{"$or": orList})
}
f = bson.D{{"$and", andList}}
f = bson.D{{Key: "$and", Value: andList}}
}
}
return f
@@ -148,6 +149,137 @@ type Filter struct {
Value interface{} `json:"value,omitempty"`
}
// FiltersFromFlatMap builds a *Filters from a map[string]interface{} whose structure
// mirrors the JSON form of Filters:
//
// {
// "and": { "name": [{"operator":"like","value":"foo"}] },
// "or": { "source": [{"operator":"equal","value":"bar"}] }
// }
//
// Keys inside "and"/"or" are json tag names; the function resolves each to its
// full dotted BSON path using the target struct. Unknown keys are kept as-is.
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Filters {
filters := &Filters{
And: make(map[string][]Filter),
Or: make(map[string][]Filter),
}
paths := jsonToBsonPaths(reflect.TypeOf(target), "")
resolve := func(jsonKey string) string {
if p, ok := paths[jsonKey]; ok {
return p
}
return jsonKey
}
parseFilters := func(raw interface{}) map[string][]Filter {
out := make(map[string][]Filter)
m, ok := raw.(map[string]interface{})
if !ok {
return out
}
for jsonKey, val := range m {
bsonKey := resolve(jsonKey)
items, ok := val.([]interface{})
if !ok {
continue
}
for _, item := range items {
entry, ok := item.(map[string]interface{})
if !ok {
continue
}
f := Filter{}
if op, ok := entry["operator"].(string); ok {
f.Operator = op
}
if v, ok := entry["value"]; ok {
f.Value = v
}
out[bsonKey] = append(out[bsonKey], f)
}
}
return out
}
if and, ok := flatMap["and"]; ok {
filters.And = parseFilters(and)
}
if or, ok := flatMap["or"]; ok {
filters.Or = parseFilters(or)
}
return filters
}
// jsonToBsonPaths recursively walks a struct type and returns a map of
// json_name → dotted_bson_path for every field reachable from that type.
//
// Anonymous embedded fields without any tag follow the BSON convention of this
// codebase: they are stored as a nested sub-document whose key is the lowercased
// struct type name (e.g. utils.AbstractObject → "abstractobject").
func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string {
for t.Kind() == reflect.Ptr || t.Kind() == reflect.Slice {
t = t.Elem()
}
result := make(map[string]string)
if t.Kind() != reflect.Struct {
return result
}
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
jsonTag := field.Tag.Get("json")
bsonTag := field.Tag.Get("bson")
jsonName := strings.Split(jsonTag, ",")[0]
bsonName := strings.Split(bsonTag, ",")[0]
// Anonymous embedded struct with no tags: use lowercase type name as BSON prefix.
if field.Anonymous && jsonName == "" && bsonName == "" {
ft := field.Type
for ft.Kind() == reflect.Ptr {
ft = ft.Elem()
}
if ft.Kind() == reflect.Struct {
embedPrefix := strings.ToLower(ft.Name())
if prefix != "" {
embedPrefix = prefix + "." + embedPrefix
}
for k, v := range jsonToBsonPaths(ft, embedPrefix) {
if _, exists := result[k]; !exists {
result[k] = v
}
}
}
continue
}
if jsonName == "" || jsonName == "-" {
continue
}
if bsonName == "" || bsonName == "-" {
bsonName = jsonName
}
fullPath := bsonName
if prefix != "" {
fullPath = prefix + "." + bsonName
}
result[jsonName] = fullPath
ft := field.Type
for ft.Kind() == reflect.Ptr || ft.Kind() == reflect.Slice {
ft = ft.Elem()
}
if ft.Kind() == reflect.Struct {
for k, v := range jsonToBsonPaths(ft, fullPath) {
if _, exists := result[k]; !exists {
result[k] = v
}
}
}
}
return result
}
type Input = map[string]interface{}
func InputToBson(i Input, isUpdate bool) bson.D {

View File

@@ -282,12 +282,11 @@ func (m *MongoDB) LoadOne(id string, collection_name string) (*mongo.SingleResul
return res, 200, nil
}
func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.Cursor, int, error) {
func (m *MongoDB) Search(filters *dbs.Filters, collection_name string, offset int64, limit int64) (*mongo.Cursor, int, error) {
if err := m.createClient(mngoConfig.GetUrl(), false); err != nil {
return nil, 503, err
}
opts := options.Find()
opts.SetLimit(1000)
targetDBCollection := CollectionMap[collection_name]
if targetDBCollection == nil {
return nil, 503, errors.New("collection " + collection_name + " not initialized")
@@ -295,6 +294,9 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
f := dbs.GetBson(filters)
opts.SetSkip(offset) // OFFSET
opts.SetLimit(limit) // LIMIT
MngoCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
// defer cancel()
if cursor, err := targetDBCollection.Find(
@@ -329,7 +331,8 @@ func (m *MongoDB) LoadFilter(filter map[string]interface{}, collection_name stri
return res, 200, nil
}
func (m *MongoDB) LoadAll(collection_name string) (*mongo.Cursor, int, error) {
func (m *MongoDB) LoadAll(collection_name string, offset int64, limit int64) (*mongo.Cursor, int, error) {
if err := m.createClient(mngoConfig.GetUrl(), false); err != nil {
return nil, 503, err
}
@@ -337,8 +340,10 @@ func (m *MongoDB) LoadAll(collection_name string) (*mongo.Cursor, int, error) {
MngoCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
res, err := targetDBCollection.Find(MngoCtx, bson.D{})
findOptions := options.Find()
findOptions.SetSkip(offset) // OFFSET
findOptions.SetLimit(limit) // LIMIT
res, err := targetDBCollection.Find(MngoCtx, bson.D{}, findOptions)
if err != nil {
// m.Logger.Error().Msg("Couldn't find any resources. Error : " + err.Error())
return nil, 404, err

View File

@@ -64,8 +64,13 @@ const (
PURCHASE_RESOURCE = tools.PURCHASE_RESOURCE
NATIVE_TOOL = tools.NATIVE_TOOL
EXECUTION_VERIFICATION = tools.EXECUTION_VERIFICATION
ALLOWED_IMAGE = tools.ALLOWED_IMAGE
)
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
return dbs.FiltersFromFlatMap(flatMap, target)
}
func GetMySelf() (*peer.Peer, error) {
pp, err := utils.GetMySelf((&peer.Peer{}).GetAccessor(&tools.APIRequest{Admin: true}))
if pp == nil {
@@ -150,6 +155,9 @@ func InitDaemon(appName string) {
// resources.InitNative()
// feed the library with the loaded config
SetConfig(
o.GetBoolDefault("IS_NANO", false),
o.GetBoolDefault("IS_API", true),
o.GetStringDefault("MONGO_URL", "mongodb://127.0.0.1:27017"),
o.GetStringDefault("MONGO_DATABASE", "DC_myDC"),
o.GetStringDefault("NATS_URL", "nats://localhost:4222"),
@@ -167,11 +175,13 @@ func InitDaemon(appName string) {
o.GetStringDefault("INTERNAL_DATACENTER_API", "oc-datacenter"),
o.GetStringDefault("INTERNAL_SCHEDULER_API", "oc-scheduler"),
)
if config.GetConfig().IsApi {
// Beego init
beego.BConfig.AppName = appName
beego.BConfig.Listen.HTTPPort = o.GetIntDefault("port", 8080)
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
}
}
type IDTokenClaims struct {
@@ -191,6 +201,46 @@ type Claims struct {
Session SessionClaims `json:"session"`
}
func GetExtends(objs []utils.ShallowDBObject, typ ...string) []map[string]interface{} {
cache := map[tools.DataType]map[string]interface{}{}
m := []map[string]interface{}{}
for _, obj := range objs {
m = append(m, GetExtend(obj, obj.Extend(typ...), cache))
}
return m
}
func GetExtend(obj utils.DBObject, extends map[string][]tools.DataType, cache map[tools.DataType]map[string]interface{}) map[string]interface{} {
base := obj.Serialize(obj)
for k, v := range extends {
if base[k+"_id"] == nil || base[k+"_id"] == "" {
fmt.Println(k+"_id", "GET EXTEND")
continue
}
for _, vv := range v {
if cache[vv] != nil && cache[vv][fmt.Sprintf("%v", base[k+"_id"])] != nil {
fmt.Println("FOUND", vv, k, cache[vv][fmt.Sprintf("%v", base[k+"_id"])])
base[k] = cache[vv][fmt.Sprintf("%v", base[k+"_id"])]
continue
}
if d, _, err := models.Model(vv.EnumIndex()).GetAccessor(&tools.APIRequest{
Admin: true,
}).LoadOne(fmt.Sprintf("%v", base[k+"_id"])); d != nil && err == nil {
base[k] = d.Serialize(d)
if cache[vv] == nil {
cache[vv] = map[string]interface{}{}
}
if cache[vv][fmt.Sprintf("%v", base[k+"_id"])] == nil {
fmt.Println("TTT", vv, k, base[k])
cache[vv][fmt.Sprintf("%v", base[k+"_id"])] = base[k]
}
break
}
}
}
return base
}
func ExtractTokenInfo(request http.Request) (string, string, []string) {
reqToken := request.Header.Get("Authorization")
splitToken := strings.Split(reqToken, "Bearer ")
@@ -200,25 +250,42 @@ func ExtractTokenInfo(request http.Request) (string, string, []string) {
reqToken = splitToken[1]
}
if reqToken != "" {
token := strings.Split(reqToken, ".")
if len(token) > 2 {
bytes, err := base64.StdEncoding.DecodeString(token[2])
if err != nil {
return "", "", []string{}
}
var c Claims
err = json.Unmarshal(bytes, &c)
if err != nil {
return "", "", []string{}
}
return c.Session.IDToken.UserID, c.Session.IDToken.PeerID, c.Session.IDToken.Groups
}
return extractFromToken(reqToken, "user_id"), extractFromToken(reqToken, "peer_id"), strings.Split(extractFromToken(reqToken, "groups"), ",")
}
return "", "", []string{}
}
func InitAPI(appName string) {
func extractFromToken(token string, attr string) string {
parts := strings.Split(token, ".")
if len(parts) < 2 {
return ""
}
payload := parts[1]
switch len(payload) % 4 {
case 2:
payload += "=="
case 3:
payload += "="
}
b, err := base64.URLEncoding.DecodeString(payload)
if err != nil {
return ""
}
var claims map[string]interface{}
if err := json.Unmarshal(b, &claims); err != nil {
return ""
}
ext, ok := claims["ext"].(map[string]interface{})
if !ok {
return ""
}
peerID, _ := ext[attr].(string)
return peerID
}
func InitAPI(appName string, extraRoutes ...map[string][]string) {
InitDaemon(appName)
if config.GetConfig().IsApi {
beego.BConfig.Listen.HTTPPort = config.GetConfig().APIPort
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
@@ -230,8 +297,10 @@ func InitAPI(appName string) {
AllowCredentials: true,
})
beego.InsertFilter("*", beego.BeforeRouter, c)
api := &tools.API{}
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo())
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo(), extraRoutes...)
}
}
//
@@ -253,11 +322,11 @@ func GetLogger() zerolog.Logger {
* @param logLevel string
* @return *Config
*/
func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string,
func SetConfig(isNano bool, isApi bool, mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string,
port int, pppath string, pkpath string,
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI string, internalSchedulerAPI string) *config.Config {
cfg := config.SetConfig(mongoUrl, database, natsUrl, lokiUrl, logLevel, port, pkpath, pppath, internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
cfg := config.SetConfig(isNano, isApi, mongoUrl, database, natsUrl, lokiUrl, logLevel, port, pkpath, pppath, internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI, internalSchedulerAPI)
defer func() {
if r := recover(); r != nil {
@@ -341,7 +410,7 @@ func (r *Request) PaymentTunnel(o *order.Order, scheduler *workflow_execution.Wo
* @param c ...*tools.HTTPCaller
* @return data LibDataShallow
*/
func (r *Request) Search(filters *dbs.Filters, word string, isDraft bool) (data LibDataShallow) {
func (r *Request) Search(filters *dbs.Filters, word string, isDraft bool, offset int64, limit int64) (data LibDataShallow) {
defer func() { // recover the panic
if r := recover(); r != nil {
tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in Search : "+fmt.Sprintf("%v", r)))
@@ -354,7 +423,7 @@ func (r *Request) Search(filters *dbs.Filters, word string, isDraft bool) (data
PeerID: r.PeerID,
Groups: r.Groups,
Admin: r.admin,
}).Search(filters, word, isDraft)
}).Search(filters, word, isDraft, offset, limit)
if err != nil {
data = LibDataShallow{Data: d, Code: code, Err: err.Error()}
return
@@ -369,7 +438,7 @@ func (r *Request) Search(filters *dbs.Filters, word string, isDraft bool) (data
* @param c ...*tools.HTTPCaller
* @return data LibDataShallow
*/
func (r *Request) LoadAll(isDraft bool) (data LibDataShallow) {
func (r *Request) LoadAll(isDraft bool, offset int64, limit int64) (data LibDataShallow) {
defer func() { // recover the panic
if r := recover(); r != nil {
tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in LoadAll : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack())))
@@ -382,7 +451,7 @@ func (r *Request) LoadAll(isDraft bool) (data LibDataShallow) {
PeerID: r.PeerID,
Groups: r.Groups,
Admin: r.admin,
}).LoadAll(isDraft)
}).LoadAll(isDraft, offset, limit)
if err != nil {
data = LibDataShallow{Data: d, Code: code, Err: err.Error()}
return
@@ -712,7 +781,7 @@ func InitNATSDecentralizedEmitter(authorizedDT ...tools.DataType) {
return // don't trust anyone... only friends and foes are privilege
}
access := NewRequestAdmin(LibDataEnum(resp.Datatype), nil)
if data := access.Search(nil, fmt.Sprintf("%v", p[resp.SearchAttr]), false); len(data.Data) > 0 {
if data := access.Search(nil, fmt.Sprintf("%v", p[resp.SearchAttr]), false, 0, 1); len(data.Data) > 0 {
delete(p, "id")
access.UpdateOne(p, data.Data[0].GetID())
} else {
@@ -731,8 +800,8 @@ func InitNATSDecentralizedEmitter(authorizedDT ...tools.DataType) {
access := NewRequestAdmin(LibDataEnum(resp.Datatype), nil)
err := json.Unmarshal(resp.Payload, &p)
if err == nil {
if data := access.Search(nil, fmt.Sprintf("%v", p[resp.SearchAttr]), false); len(data.Data) > 0 {
access.DeleteOne(fmt.Sprintf("%v", p[resp.SearchAttr]))
if data := access.Search(nil, fmt.Sprintf("%v", p[resp.SearchAttr]), false, 0, 1); len(data.Data) > 0 {
access.DeleteOne(data.Data[0].GetID())
}
}

20
go.mod
View File

@@ -7,6 +7,7 @@ require (
github.com/go-playground/validator/v10 v10.22.0
github.com/google/uuid v1.6.0
github.com/goraz/onion v0.1.3
github.com/libp2p/go-libp2p/core v0.43.0-rc2
github.com/nats-io/nats.go v1.37.0
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.11.1
@@ -22,18 +23,33 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.16.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.1 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.6.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/time v0.9.0 // indirect
@@ -42,6 +58,7 @@ require (
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
@@ -67,7 +84,6 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/libp2p/go-libp2p/core v0.43.0-rc2
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -89,3 +105,5 @@ require (
k8s.io/api v0.35.1
k8s.io/client-go v0.35.1
)
replace github.com/libp2p/go-libp2p/core => github.com/libp2p/go-libp2p v0.47.0

2
go.sum
View File

@@ -133,6 +133,8 @@ github.com/multiformats/go-multicodec v0.9.1 h1:x/Fuxr7ZuR4jJV4Os5g444F7xC4XmyUa
github.com/multiformats/go-multicodec v0.9.1/go.mod h1:LLWNMtyV5ithSBUo3vFIMaeDy+h3EbkMTek1m+Fybbo=
github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U=
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
github.com/multiformats/go-multistream v0.6.1 h1:4aoX5v6T+yWmc2raBHsTvzmFhOI8WVOer28DeBBEYdQ=
github.com/multiformats/go-multistream v0.6.1/go.mod h1:ksQf6kqHAb6zIsyw7Zm+gAuVo57Qbq84E27YlYqavqw=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=

View File

@@ -0,0 +1,56 @@
package allowed_image
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// AllowedImage représente une image de conteneur autorisée à persister
// sur un peer après l'exécution d'un workflow.
//
// La décision de rétention est entièrement locale au datacenter —
// le fournisseur de processing n'a aucun levier sur cette liste.
//
// Règle de matching (côté oc-datacenter) :
// - Registry vide = toutes les registries
// - TagConstraint vide = toutes les versions
// - TagConstraint non vide = exact ou glob (ex: "3.*", "1.2.3")
//
// Les entrées IsDefault sont créées au bootstrap et ne peuvent pas
// être supprimées via l'API.
type AllowedImage struct {
utils.AbstractObject
// Registry source (ex: "docker.io", "registry.example.com").
// Vide = wildcard, accepte n'importe quelle registry.
Registry string `json:"registry,omitempty" bson:"registry,omitempty"`
// Image est le nom de l'image sans registry ni tag
// (ex: "natsio/nats-box", "library/alpine").
Image string `json:"image" bson:"image" validate:"required"`
// TagConstraint est la contrainte sur le tag.
// Vide = toutes les versions autorisées.
// Supporte exact ("1.2.3") ou glob ("3.*", "*-alpine").
TagConstraint string `json:"tag_constraint,omitempty" bson:"tag_constraint,omitempty"`
// IsDefault marque les entrées bootstrap insérées au démarrage.
// Ces entrées ne peuvent pas être supprimées via l'API.
IsDefault bool `json:"is_default,omitempty" bson:"is_default,omitempty"`
}
func (a *AllowedImage) StoreDraftDefault() {
a.IsDraft = false // les allowed images sont actives immédiatement
}
func (a *AllowedImage) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
return true, set
}
func (a *AllowedImage) CanDelete() bool {
return !a.IsDefault // les entrées bootstrap sont non supprimables
}
func (a *AllowedImage) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request)
}

View File

@@ -0,0 +1,23 @@
package allowed_image
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type allowedImageMongoAccessor struct {
utils.AbstractAccessor[*AllowedImage]
}
func NewAccessor(request *tools.APIRequest) *allowedImageMongoAccessor {
return &allowedImageMongoAccessor{
AbstractAccessor: utils.AbstractAccessor[*AllowedImage]{
Logger: logs.CreateLogger(tools.ALLOWED_IMAGE.String()),
Request: request,
Type: tools.ALLOWED_IMAGE,
New: func() *AllowedImage { return &AllowedImage{} },
NotImplemented: []string{"CopyOne"},
},
}
}

View File

@@ -28,6 +28,20 @@ type Bill struct {
Total float64 `json:"total" bson:"total" validate:"required"`
}
func (ri *Bill) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "order":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.ORDER)
}
}
return ext
}
func GenerateBill(order *order.Order, request *tools.APIRequest) (*Bill, error) {
// hhmmm : should get... the loop.
return &Bill{
@@ -221,7 +235,7 @@ func (d *PeerItemOrder) GetPriceHT(request *tools.APIRequest) (float64, error) {
And: map[string][]dbs.Filter{
"resource_id": {{Operator: dbs.EQUAL.String(), Value: priced.GetID()}},
},
}, "", d.Purchase.IsDraft)
}, "", d.Purchase.IsDraft, 0, 10000)
if code == 200 && len(search) > 0 {
for _, s := range search {
if s.(*purchase_resource.PurchaseResource).EndDate == nil || time.Now().UTC().After(*s.(*purchase_resource.PurchaseResource).EndDate) {

View File

@@ -14,6 +14,9 @@ import (
*/
type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ToMaster string `json:"to_master,omitempty" bson:"to_master,omitempty"`
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"`
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
@@ -37,6 +40,15 @@ type Booking struct {
// Authorization: identifies who created this draft and the Check session it belongs to.
// Used to verify UPDATE and DELETE orders from remote schedulers.
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
// Peerless is true when the booked resource has no destination peer
// (e.g. a public Docker Hub image). No peer confirmation or pricing
// negotiation is needed; the booking is stored locally only.
Peerless bool `json:"peerless,omitempty" bson:"peerless,omitempty"`
// OriginRef carries the registry reference of a peerless resource
// (e.g. "docker.io/pytorch/pytorch:2.1") so schedulers can validate it.
OriginRef string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"`
}
func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResume {

View File

@@ -74,7 +74,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
And: map[string][]dbs.Filter{
"expected_start_date": {{Operator: dbs.GTE.String(), Value: time.Now().UTC()}},
},
}, "*", false)
}, "*", false, 0, 10000)
if code != 200 || err != nil {
return nil, err
}
@@ -82,7 +82,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
And: map[string][]dbs.Filter{
"expected_start_date": {{Operator: dbs.GTE.String(), Value: time.Now().UTC()}},
},
}, "*", true)
}, "*", true, 0, 10000)
bookings := append(confirmed, drafts...)
p := &Planner{

View File

@@ -91,7 +91,7 @@ func filterEnrich[T utils.ShallowDBObject](arr []string, isDrafted bool, a utils
Or: map[string][]dbs.Filter{
"abstractobject.id": {{Operator: dbs.IN.String(), Value: arr}},
},
}, "", isDrafted)
}, "", isDrafted, 0, int64(len(arr)))
if code == 200 {
for _, r := range res {
new = append(new, r.(T))

View File

@@ -32,6 +32,7 @@ const (
FORGOTTEN
DELAYED
CANCELLED
IN_PREPARATION
)
var str = [...]string{
@@ -43,6 +44,7 @@ var str = [...]string{
"forgotten",
"delayed",
"cancelled",
"in_preparation",
}
func FromInt(i int) string {
@@ -60,5 +62,5 @@ func (d BookingStatus) EnumIndex() int {
// List
func StatusList() []BookingStatus {
return []BookingStatus{DRAFT, SCHEDULED, STARTED, FAILURE, SUCCESS, FORGOTTEN, DELAYED, CANCELLED}
return []BookingStatus{DRAFT, SCHEDULED, STARTED, FAILURE, SUCCESS, FORGOTTEN, DELAYED, CANCELLED, IN_PREPARATION}
}

View File

@@ -6,8 +6,6 @@ type Container struct {
Args string `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments
Env map[string]string `json:"env,omitempty" bson:"env,omitempty"` // Env is the container environment variables
Volumes map[string]string `json:"volumes,omitempty" bson:"volumes,omitempty"` // Volumes is the container volumes
Exposes []Expose `bson:"exposes,omitempty" json:"exposes,omitempty"` // Expose is the execution
}
type Expose struct {

View File

@@ -9,6 +9,7 @@ import (
type PricedItemITF interface {
GetID() string
GetName() string
GetInstanceID() string
GetType() tools.DataType
IsPurchasable() bool

View File

@@ -28,7 +28,25 @@ func RefundTypeList() []RefundType {
return []RefundType{REFUND_DEAD_END, REFUND_ON_ERROR, REFUND_ON_EARLY_END}
}
type PaymentType int
const (
PAY_ONCE PaymentType = iota
PAY_EVERY_WEEK
PAY_EVERY_MONTH
PAY_EVERY_YEAR
)
func (t PaymentType) String() string {
return [...]string{"PAY ONCE", "PAY_EVERY_WEEK", "PAY_EVERY_MONTH", "PAY_EVERY_YEAR"}[t]
}
func PaymentTypeList() []PaymentType {
return []PaymentType{PAY_ONCE, PAY_EVERY_WEEK, PAY_EVERY_MONTH, PAY_EVERY_YEAR}
}
type AccessPricingProfile[T Strategy] struct { // only use for acces such as : DATA && PROCESSING
AllowedPaymentType []PaymentType `json:"allowed_payment_type,omitempty" bson:"allowed_payment_type,omitempty"` // Price is the price of the resource
Pricing PricingStrategy[T] `json:"pricing,omitempty" bson:"pricing,omitempty"` // Price is the price of the resource
DefaultRefund RefundType `json:"default_refund" bson:"default_refund"` // DefaultRefund is the default refund type of the pricing
RefundRatio int32 `json:"refund_ratio" bson:"refund_ratio" default:"0"` // RefundRatio is the refund ratio if missing

View File

@@ -154,6 +154,8 @@ func BookingEstimation(t TimePricingStrategy, price float64, locationDurationInS
type PricingStrategy[T Strategy] struct {
Price float64 `json:"price" bson:"price" default:"0"` // Price is the Price of the pricing
Currency string `json:"currency" bson:"currency" default:"USD"` // Currency is the currency of the pricing
// NO NEED ?
BuyingStrategy BuyingStrategy `json:"buying_strategy" bson:"buying_strategy" default:"0"` // BuyingStrategy is the buying strategy of the pricing
TimePricingStrategy TimePricingStrategy `json:"time_pricing_strategy" bson:"time_pricing_strategy" default:"0"` // TimePricingStrategy is the time pricing strategy of the pricing
OverrideStrategy T `json:"override_strategy" bson:"override_strategy" default:"-1"` // Modulation is the modulation of the pricing

View File

@@ -18,6 +18,20 @@ type ExecutionVerification struct {
Validate bool `json:"validate" bson:"validate,omitempty"`
}
func (ri *ExecutionVerification) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "wokflow":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW)
}
}
return ext
}
func (r *ExecutionVerification) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY
}

View File

@@ -44,6 +44,25 @@ type AbstractLive struct {
ResourcesID []string `json:"resources_id" bson:"resources_id"`
}
func (ri *AbstractLive) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "resource":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW_RESOURCE)
ext[t] = append(ext[t], tools.DATA_RESOURCE)
ext[t] = append(ext[t], tools.COMPUTE_RESOURCE)
ext[t] = append(ext[t], tools.STORAGE_RESOURCE)
ext[t] = append(ext[t], tools.PROCESSING_RESOURCE)
ext[t] = append(ext[t], tools.SERVICE_RESOURCE)
}
}
return ext
}
func (d *AbstractLive) GetMonitorPath() string {
return d.MonitorPath
}

View File

@@ -33,7 +33,7 @@ func (d *LiveDatacenter) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveDatacenter](tools.LIVE_DATACENTER, request) // Create a new instance of the accessor
}
func (d *LiveDatacenter) GetResourceAccessor(request *tools.APIRequest) utils.Accessor {
return resources.NewAccessor[*resources.ComputeResource](tools.COMPUTE_RESOURCE, request, func() utils.DBObject { return &resources.ComputeResource{} })
return resources.NewAccessor[*resources.ComputeResource](tools.COMPUTE_RESOURCE, request)
}
func (d *LiveDatacenter) GetResource() resources.ResourceInterface {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -89,3 +90,25 @@ func (a *liveMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int
}
}
}
func (wfa *liveMongoAccessor[T]) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[T](wfa.GetExec(isDraft), isDraft, wfa, offset, limit)
}
func (wfa *liveMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
if filters == nil && search == "*" {
return utils.GenericLoadAll[T](func(d utils.DBObject) utils.ShallowDBObject {
return d
}, isDraft, wfa, offset, limit)
}
return utils.GenericSearch[T](filters, search, wfa.New().GetObjectFilters(search),
func(d utils.DBObject) utils.ShallowDBObject {
return d
}, isDraft, wfa, offset, limit)
}
func (a *liveMongoAccessor[T]) GetExec(isDraft bool) func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
return d
}
}

View File

@@ -29,7 +29,7 @@ func (d *LiveStorage) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveStorage](tools.LIVE_STORAGE, request) // Create a new instance of the accessor
}
func (d *LiveStorage) GetResourceAccessor(request *tools.APIRequest) utils.Accessor {
return resources.NewAccessor[*resources.ComputeResource](tools.STORAGE_RESOURCE, request, func() utils.DBObject { return &resources.StorageResource{} })
return resources.NewAccessor[*resources.StorageResource](tools.STORAGE_RESOURCE, request)
}
func (d *LiveStorage) GetResource() resources.ResourceInterface {

View File

@@ -2,11 +2,13 @@ package models
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/allowed_image"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"cloud.o-forge.io/core/oc-lib/models/booking"
@@ -14,7 +16,6 @@ import (
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/rules/rule"
"cloud.o-forge.io/core/oc-lib/models/peer"
resource "cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
w2 "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
w3 "cloud.o-forge.io/core/oc-lib/models/workspace"
@@ -30,6 +31,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &resource.ComputeResource{} },
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.SERVICE_RESOURCE.String(): func() utils.DBObject { return &resource.ServiceResource{} },
tools.NATIVE_TOOL.String(): func() utils.DBObject { return &resource.NativeTool{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
@@ -46,6 +48,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.LIVE_STORAGE.String(): func() utils.DBObject { return &live.LiveStorage{} },
tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
}
// Model returns the model object based on the model type

View File

@@ -3,9 +3,19 @@ package peer
import (
"fmt"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
)
type PeerPerm int
const (
READ PeerRelation = iota
WRITE
MONITOR
)
type PeerRelation int
@@ -16,9 +26,13 @@ const (
PARTNER
BLACKLIST
PENDING_PARTNER
MASTER
NANO
PENDING_NANO
PENDING_MASTER
)
var path = []string{"unknown", "self", "partner", "blacklist", "partner"}
var path = []string{"unknown", "self", "partner", "blacklist", "partner", "pending_partner", "master", "nano", "pending_nano", "pending_master"}
func GetRelationPath(str string) int {
for i, p := range path {
@@ -41,11 +55,47 @@ func (m PeerRelation) EnumIndex() int {
return int(m)
}
// BehaviorWarning records a single misbehavior observed by a trusted service.
type BehaviorWarning struct {
At time.Time `json:"at" bson:"at"`
ReporterApp string `json:"reporter_app" bson:"reporter_app"`
Severity tools.BehaviorSeverity `json:"severity" bson:"severity"`
Reason string `json:"reason" bson:"reason"`
Evidence string `json:"evidence,omitempty" bson:"evidence,omitempty"`
}
// PeerLocation holds the voluntarily disclosed geographic position of a node.
// Granularity controls how precise the location is:
//
// 0 = not disclosed
// 1 = continent (±15°)
// 2 = country (±3°) — default
// 3 = region (±0.5°)
// 4 = city (±0.05°)
//
// The coordinates are always fuzzed by oc-discovery before publication,
// so a granularity-2 location identifies only the rough country area.
type PeerLocation struct {
Latitude float64 `json:"latitude" bson:"latitude"`
Longitude float64 `json:"longitude" bson:"longitude"`
Granularity int `json:"granularity" bson:"granularity"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
Timezone string `json:"timezone,omitempty" bson:"timezone,omitempty"`
}
// Peer is a struct that represents a peer
type Peer struct {
utils.AbstractObject
IsNano bool `json:"is_nano" bson:"is_nano"`
PeerPerms []PeerPerm `json:"peer_perms" bson:"peer_perms"`
RelationLastChangeDate time.Time `json:"relation_last_change_date" bson:"relation_last_change_date"`
RelationLastChangeUser string `json:"relation_last_change_user" bson:"relation_last_change_user"`
Verify bool `json:"verify" bson:"verify"`
OrganizationID string `json:"organization_id" bson:"organization_id"`
PeerID string `json:"peer_id" bson:"peer_id" validate:"required"`
APIUrl string `json:"api_url" bson:"api_url" validate:"required"` // Url is the URL of the peer (base64url)
@@ -56,12 +106,76 @@ type Peer struct {
Relation PeerRelation `json:"relation" bson:"relation" default:"0"`
ServicesState map[string]int `json:"services_state,omitempty" bson:"services_state,omitempty"`
FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried
// Location is the voluntarily disclosed (and fuzzed) geographic position.
Location *PeerLocation `json:"location,omitempty" bson:"location,omitempty"`
// Trust scoring — maintained by oc-discovery from PEER_BEHAVIOR_EVENT reports.
TrustScore float64 `json:"trust_score" bson:"trust_score" default:"100"`
BlacklistReason string `json:"blacklist_reason,omitempty" bson:"blacklist_reason,omitempty"`
BehaviorWarnings []BehaviorWarning `json:"behavior_warnings,omitempty" bson:"behavior_warnings,omitempty"`
// Volatile connectivity state — never persisted to DB (bson:"-").
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
Online bool `json:"online" bson:"-"`
LastHeartbeat *time.Time `json:"last_heartbeat,omitempty" bson:"-"`
}
func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "peer":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
}
}
return ext
}
func (ao *Peer) VerifyAuth(callName string, request *tools.APIRequest) bool {
return true
}
// BlacklistThreshold is the trust score below which a peer is auto-blacklisted.
const BlacklistThreshold = 20.0
// ApplyBehaviorReport records a misbehavior, deducts the trust penalty, and
// returns true when the trust score has fallen below BlacklistThreshold so the
// caller can trigger the relation change.
func (p *Peer) ApplyBehaviorReport(r tools.PeerBehaviorReport) (shouldBlacklist bool) {
p.BehaviorWarnings = append(p.BehaviorWarnings, BehaviorWarning{
At: r.At,
ReporterApp: r.ReporterApp,
Severity: r.Severity,
Reason: r.Reason,
Evidence: r.Evidence,
})
if p.TrustScore == 0 {
p.TrustScore = 100 // initialise if never set
}
p.TrustScore -= r.Severity.Penalty()
if p.TrustScore < 0 {
p.TrustScore = 0
}
if p.TrustScore <= BlacklistThreshold {
p.BlacklistReason = r.Reason
return true
}
return false
}
// ResetTrust clears all behavior history and resets the trust score to 100.
// Must be called when a peer relation is manually set to NONE or PARTNER.
func (p *Peer) ResetTrust() {
p.TrustScore = 100
p.BlacklistReason = ""
p.BehaviorWarnings = nil
}
// AddExecution adds an execution to the list of failed executions
func (ao *Peer) AddExecution(exec PeerExecution) {
found := false
@@ -96,7 +210,3 @@ func (d *Peer) GetAccessor(request *tools.APIRequest) utils.Accessor {
data := NewAccessor(request) // Create a new instance of the accessor
return data
}
func (r *Peer) CanDelete() bool {
return false // only draft order can be deleted
}

View File

@@ -42,6 +42,23 @@ func (wfa *peerMongoAccessor) ShouldVerifyAuth() bool {
return !wfa.OverrideAuth
}
/*
TODO : organization_ID est un peer_ID duquel on se revendique faire parti.
Ca implique une clé d'organisation + une demande d'intégration.
Slave-Master IOT
*/
func (dca *peerMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
pp, _ := utils.GetMySelf(NewAccessor(&tools.APIRequest{Admin: true}))
if data != nil {
d := data.(*Peer)
if pp != nil && d.OrganizationID != "" && d.OrganizationID == pp.(*Peer).OrganizationID {
d.Relation = PARTNER // defaulting on partner if same organization.
}
}
return utils.GenericStoreOne(data, dca)
}
/*
* Nothing special here, just the basic CRUD operations
*/

View File

@@ -25,7 +25,7 @@ type ComputeResource struct {
}
func (d *ComputeResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*ComputeResource](tools.COMPUTE_RESOURCE, request, func() utils.DBObject { return &ComputeResource{} })
return NewAccessor[*ComputeResource](tools.COMPUTE_RESOURCE, request)
}
func (r *ComputeResource) GetType() string {
@@ -56,15 +56,22 @@ type ComputeNode struct {
type ComputeResourceInstance struct {
ResourceInstance[*ComputeResourcePartnership]
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
Source string `json:"source,omitempty" bson:"source,omitempty"`
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"`
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"`
Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
// AvailableStorages lists storage capabilities activatable on this compute unit (e.g. Minio, local volumes).
// These are shallow StorageResource entries — not independent catalog items — but carry full pricing structure.
AvailableStorages []*StorageResource `json:"available_storages,omitempty" bson:"available_storages,omitempty"`
}
// IsPeerless is always false for compute instances: a compute resource is
// infrastructure owned by a peer and can never be declared peerless.
func (ri *ComputeResourceInstance) IsPeerless() bool { return false }
func NewComputeResourceInstance(name string, peerID string) ResourceInstanceITF {
return &ComputeResourceInstance{
ResourceInstance: ResourceInstance[*ComputeResourcePartnership]{

View File

@@ -30,13 +30,22 @@ type DataResource struct {
}
func (d *DataResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*DataResource](tools.DATA_RESOURCE, request, func() utils.DBObject { return &DataResource{} }) // Create a new instance of the accessor
return NewAccessor[*DataResource](tools.DATA_RESOURCE, request) // Create a new instance of the accessor
}
func (r *DataResource) GetType() string {
return tools.DATA_RESOURCE.String()
}
func (ri *DataResource) StoreDraftDefault() {
ri.AbstractObject.StoreDraftDefault()
ri.Env = append(ri.Env, models.Param{
Attr: "source",
Value: "[resource]instance.source",
Readonly: true,
})
}
func (abs *DataResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.DATA_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data")
@@ -67,24 +76,6 @@ func NewDataInstance(name string, peerID string) ResourceInstanceITF {
}
}
func (ri *DataInstance) StoreDraftDefault() {
found := false
for _, p := range ri.ResourceInstance.Env {
if p.Attr == "source" {
found = true
break
}
}
if !found {
ri.ResourceInstance.Env = append(ri.ResourceInstance.Env, models.Param{
Attr: "source",
Value: ri.Source,
Readonly: true,
})
}
ri.ResourceInstance.StoreDraftDefault()
}
type DataResourcePartnership struct {
ResourcePartnerShip[*DataResourcePricingProfile]
MaxDownloadableGbAllowed float64 `json:"allowed_gb,omitempty" bson:"allowed_gb,omitempty"`
@@ -95,7 +86,7 @@ type DataResourcePartnership struct {
type DataResourcePricingStrategy int
const (
PER_DOWNLOAD DataResourcePricingStrategy = iota + 6
PER_DOWNLOAD DataResourcePricingStrategy = iota + 7
PER_TB_DOWNLOADED
PER_GB_DOWNLOADED
PER_MB_DOWNLOADED

View File

@@ -3,6 +3,7 @@ package resources
import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -19,17 +20,23 @@ type ResourceInterface interface {
ConvertToPricedResource(t tools.DataType, a *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, b *int, request *tools.APIRequest) (pricing.PricedItemITF, error)
GetType() string
ClearEnv() utils.DBObject
VerifyBuy()
SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF
AddInstances(instance ResourceInstanceITF)
GetSelectedInstance(index *int) ResourceInstanceITF
StoreDraftDefault()
GetEnv() []models.Param
GetInputs() []models.Param
GetOutputs() []models.Param
}
type ResourceInstanceITF interface {
utils.DBObject
GetID() string
GetName() string
StoreDraftDefault()
ClearEnv()
GetOrigin() OriginMeta
IsPeerless() bool
FilterInstance(peerID string)
GetProfile(peerID string, partnershipIndex *int, buying *int, strategy *int) pricing.PricingProfileITF
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF

View File

@@ -12,6 +12,7 @@ type ResourceSet struct {
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
@@ -19,6 +20,7 @@ type ResourceSet struct {
ComputeResources []*ComputeResource `bson:"-" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"-" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"-" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
}
func (r *ResourceSet) Clear() {
@@ -27,6 +29,7 @@ func (r *ResourceSet) Clear() {
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
}
func (r *ResourceSet) Fill(request *tools.APIRequest) {
@@ -37,6 +40,7 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) {
(&StorageResource{}): r.Storages,
(&ProcessingResource{}): r.Processings,
(&WorkflowResource{}): r.Workflows,
(&ServiceResource{}): r.Services,
} {
for _, id := range v {
d, _, e := k.GetAccessor(request).LoadOne(id)
@@ -52,6 +56,8 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
case *WorkflowResource:
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
case *ServiceResource:
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
}
}
}
@@ -65,4 +71,5 @@ type ItemResource struct {
Compute *ComputeResource `bson:"compute,omitempty" json:"compute,omitempty"`
Workflow *WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"`
NativeTool *NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
Service *ServiceResource `bson:"service,omitempty" json:"service,omitempty"`
}

View File

@@ -23,7 +23,7 @@ func (d *NativeTool) SetName(name string) {
}
func (d *NativeTool) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*NativeTool](tools.NATIVE_TOOL, request, func() utils.DBObject { return &NativeTool{} })
return NewAccessor[*NativeTool](tools.NATIVE_TOOL, request)
}
func (r *NativeTool) AddInstances(instance ResourceInstanceITF) {
@@ -61,7 +61,7 @@ func InitNative() {
for _, kind := range []native_tools.NativeToolsEnum{native_tools.WORKFLOW_EVENT} {
newNative := &NativeTool{}
access := newNative.GetAccessor(&tools.APIRequest{Admin: true})
l, _, err := access.Search(nil, kind.String(), false)
l, _, err := access.Search(nil, kind.String(), false, 0, 10)
if err != nil || len(l) == 0 {
newNative.Name = kind.String()
newNative.Kind = int(kind)

View File

@@ -0,0 +1,31 @@
package resources
// OriginType qualifies where a resource instance comes from.
type OriginType int
const (
// OriginPeer: instance offered by a known network peer (default).
OriginPeer OriginType = iota
// OriginPublic: instance from a public registry (Docker Hub, HuggingFace, etc.).
// No peer confirmation is needed; access is unrestricted.
OriginPublic
// OriginSelf: self-hosted instance with no third-party peer.
OriginSelf
)
// OriginMeta carries provenance information for a resource instance.
type OriginMeta struct {
Type OriginType `json:"origin_type" bson:"origin_type"`
Ref string `json:"origin_ref,omitempty" bson:"origin_ref,omitempty"` // e.g. "docker.io/pytorch/pytorch:2.1"
License string `json:"origin_license,omitempty" bson:"origin_license,omitempty"` // SPDX identifier or free-form
Verified bool `json:"origin_verified" bson:"origin_verified"` // manually vetted by an OC admin
}
// IsPeerless MUST NOT be used for authorization decisions.
// Use ResourceInstance.IsPeerless() instead, which derives the property
// from structural invariants rather than this self-declared field.
//
// This method is kept only for display/logging purposes.
func (o OriginMeta) DeclaredPeerless() bool {
return o.Type != OriginPeer
}

View File

@@ -47,6 +47,10 @@ func (abs *PricedResource[T]) GetID() string {
return abs.ResourceID
}
func (abs *PricedResource[T]) GetName() string {
return abs.Name
}
func (abs *PricedResource[T]) GetInstanceID() string {
return abs.InstanceID
}

View File

@@ -29,9 +29,8 @@ type ProcessingUsage struct {
type ProcessingResource struct {
AbstractInstanciatedResource[*ProcessingInstance]
IsEvent bool `json:"is_event,omitempty" bson:"is_event,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"`
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"`
OpenSource bool `json:"open_source" bson:"open_source" default:"false"`
License string `json:"license,omitempty" bson:"license,omitempty"`
Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"`
@@ -48,6 +47,8 @@ type ProcessingResourceAccess struct {
type ProcessingInstance struct {
ResourceInstance[*ResourcePartnerShip[*ProcessingResourcePricingProfile]]
Access *ProcessingResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access
SizeGB int `json:"size_gb,omitempty" bson:"size_gb,omitempty"`
ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"`
}
func NewProcessingInstance(name string, peerID string) ResourceInstanceITF {
@@ -67,7 +68,6 @@ type ProcessingResourcePartnership struct {
type PricedProcessingResource struct {
PricedResource[*ProcessingResourcePricingProfile]
IsService bool
}
func (r *PricedProcessingResource) ensurePricing() {
@@ -100,10 +100,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
a.BookingConfiguration = &BookingConfiguration{}
}
if a.BookingConfiguration.ExplicitBookingDurationS == 0 {
if a.IsService || a.BookingConfiguration.UsageStart == nil {
if a.IsService {
return -1
}
if a.BookingConfiguration.UsageStart == nil {
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
@@ -112,7 +109,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
}
func (d *ProcessingResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*ProcessingResource](tools.PROCESSING_RESOURCE, request, func() utils.DBObject { return &ProcessingResource{} }) // Create a new instance of the accessor
return NewAccessor[*ProcessingResource](tools.PROCESSING_RESOURCE, request) // Create a new instance of the accessor
}
func (abs *ProcessingResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {

View File

@@ -24,6 +24,34 @@ type PurchaseResource struct {
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
}
func (ri *PurchaseResource) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "dest_peer", "scheduler_peer":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
case "execution":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW_EXECUTION)
case "resource":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.WORKFLOW_RESOURCE)
ext[t] = append(ext[t], tools.DATA_RESOURCE)
ext[t] = append(ext[t], tools.COMPUTE_RESOURCE)
ext[t] = append(ext[t], tools.STORAGE_RESOURCE)
ext[t] = append(ext[t], tools.PROCESSING_RESOURCE)
ext[t] = append(ext[t], tools.SERVICE_RESOURCE)
}
}
return ext
}
func (d *PurchaseResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}

View File

@@ -1,6 +1,7 @@
package purchase_resource
import (
"fmt"
"time"
"cloud.o-forge.io/core/oc-lib/logs"
@@ -29,8 +30,9 @@ func NewAccessor(request *tools.APIRequest) *PurchaseResourceMongoAccessor {
*/
func (a *PurchaseResourceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
fmt.Println("FOUND PURCHASE", d)
if d.(*PurchaseResource).EndDate != nil && time.Now().UTC().After(*d.(*PurchaseResource).EndDate) {
utils.GenericDeleteOne(id, a)
utils.GenericDelete(d, a)
return nil, 404, nil
}
return d, 200, nil
@@ -40,9 +42,13 @@ func (a *PurchaseResourceMongoAccessor) LoadOne(id string) (utils.DBObject, int,
func (a *PurchaseResourceMongoAccessor) GetExec(isDraft bool) func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
if d.(*PurchaseResource).EndDate != nil && time.Now().UTC().After(*d.(*PurchaseResource).EndDate) {
utils.GenericDeleteOne(d.GetID(), a)
utils.GenericDelete(d, a)
return nil
}
return d
}
}
func (dca *PurchaseResourceMongoAccessor) ShouldVerifyAuth() bool {
return false // TEMP : by pass
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"slices"
"time"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs"
@@ -11,6 +12,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
@@ -20,7 +22,7 @@ import (
// AbstractResource is the struct containing all of the attributes commons to all ressources
type AbstractResource struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PurchaseID string `json:"purchase_id,omitempty"` // is_buy precise if a resource is buy or not
Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the resource
Logo string `json:"logo,omitempty" bson:"logo,omitempty"` // Logo is the logo of the resource
Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource
@@ -28,12 +30,61 @@ type AbstractResource struct {
Owners []utils.Owner `json:"owners,omitempty" bson:"owners,omitempty"` // Owners is the list of owners of the resource
UsageRestrictions string `bson:"usage_restrictions,omitempty" json:"usage_restrictions,omitempty"`
AllowedBookingModes map[booking.BookingMode]*pricing.PricingVariation `bson:"allowed_booking_modes" json:"allowed_booking_modes"`
Env []models.Param `json:"env,omitempty" bson:"env,omitempty"`
Inputs []models.Param `json:"inputs,omitempty" bson:"inputs,omitempty"`
Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"`
}
func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType {
dt := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "purchase":
if _, ok := dt[t]; !ok {
dt[t] = []tools.DataType{}
}
dt[t] = append(dt[t], tools.PURCHASE_RESOURCE)
}
}
return dt
}
func (abs *AbstractResource) VerifyBuy() {
p := &purchase_resource.PurchaseResource{}
access := p.GetAccessor(&tools.APIRequest{Admin: true})
purchase, _, _ := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"resource_id": {{Operator: dbs.EQUAL.String(), Value: abs.GetID()}},
},
}, "", false, 0, 1)
if len(purchase) > 0 {
abs.PurchaseID = purchase[0].GetID()
}
}
func (abs *AbstractResource) GetEnv() []models.Param {
return abs.Env
}
func (abs *AbstractResource) GetInputs() []models.Param {
return abs.Inputs
}
func (abs *AbstractResource) GetOutputs() []models.Param {
return abs.Outputs
}
func (abs *AbstractResource) FilterPeer(peerID string) *dbs.Filters {
return nil
}
func (ri *AbstractResource) ClearEnv() utils.DBObject {
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
return ri
}
func (r *AbstractResource) GetBookingModes() map[booking.BookingMode]*pricing.PricingVariation {
if len(r.AllowedBookingModes) == 0 {
return map[booking.BookingMode]*pricing.PricingVariation{
@@ -52,17 +103,13 @@ func (r *AbstractResource) GetType() string {
}
func (r *AbstractResource) StoreDraftDefault() {
r.IsDraft = true
//r.IsDraft = true pour le moment on passe outre.
}
func (r *AbstractResource) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
return r.IsDraft, set
}
func (r *AbstractResource) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
}
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
@@ -136,13 +183,6 @@ func ConvertToPricedResource[T pricing.PricingProfileITF](t tools.DataType,
}, nil
}
func (abs *AbstractInstanciatedResource[T]) ClearEnv() utils.DBObject {
for _, instance := range abs.Instances {
instance.ClearEnv()
}
return abs
}
func (r *AbstractInstanciatedResource[T]) GetSelectedInstance(selected *int) ResourceInstanceITF {
if selected != nil && len(r.Instances) > *selected {
return r.Instances[*selected]
@@ -175,13 +215,20 @@ func VerifyAuthAction[T ResourceInstanceITF](baseInstance []T, request *tools.AP
if len(instanceID) > 0 && !slices.Contains(instanceID, instance.GetID()) {
continue
}
// Structurally peerless instances (no creator, no partnerships, non-empty Ref)
// are freely accessible by any requester.
if instance.IsPeerless() {
instances = append(instances, instance)
continue
}
_, peerGroups := instance.GetPeerGroups()
for _, peers := range peerGroups {
if request == nil {
continue
}
if grps, ok := peers[request.PeerID]; ok || config.GetConfig().Whitelist {
if (ok && slices.Contains(grps, "*")) || (!ok && config.GetConfig().Whitelist) {
_, allOK := peers["*"]
if grps, ok := peers[request.PeerID]; ok || allOK || config.GetConfig().Whitelist {
if allOK || (ok && slices.Contains(grps, "*")) || (!ok && config.GetConfig().Whitelist) {
instance.FilterInstance(request.PeerID)
instances = append(instances, instance)
// TODO filter Partners + Profiles...
@@ -206,12 +253,12 @@ type GeoPoint struct {
type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject
ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"`
LastUpdate time.Time `json:"last_update,omitempty" bson:"last_update,omitempty"`
Origin OriginMeta `json:"origin,omitempty" bson:"origin,omitempty"`
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
Env []models.Param `json:"env,omitempty" bson:"env,omitempty"`
Inputs []models.Param `json:"inputs,omitempty" bson:"inputs,omitempty"`
Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"`
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
@@ -231,10 +278,23 @@ func NewInstance[T ResourcePartnerITF](name string) *ResourceInstance[T] {
}
}
func (ri *ResourceInstance[T]) GetOrigin() OriginMeta {
return ri.Origin
}
// IsPeerless returns true when the instance has no owning peer and a non-empty
// registry reference. This is derived from structural invariants — NOT from the
// self-declared Origin.Type field — to prevent auth bypass via metadata manipulation:
//
// CreatorID == "" ∧ len(Partnerships) == 0 ∧ Origin.Ref != ""
func (ri *ResourceInstance[T]) IsPeerless() bool {
return ri.CreatorID == "" && len(ri.Partnerships) == 0 && ri.Origin.Ref != ""
}
func (ri *ResourceInstance[T]) FilterInstance(peerID string) {
partnerships := []T{}
for _, p := range ri.Partnerships {
if p.GetPeerGroups()[peerID] != nil {
if p.GetPeerGroups()["*"] != nil || p.GetPeerGroups()[peerID] != nil {
p.FilterPartnership(peerID)
partnerships = append(partnerships, p)
}
@@ -242,13 +302,10 @@ func (ri *ResourceInstance[T]) FilterInstance(peerID string) {
ri.Partnerships = partnerships
}
func (ri *ResourceInstance[T]) ClearEnv() {
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int, buyingIndex *int, strategyIndex *int) pricing.PricingProfileITF {
if ri.IsPeerless() {
return pricing.GetDefaultPricingProfile()
}
if partnershipIndex != nil && len(ri.Partnerships) > *partnershipIndex {
prts := ri.Partnerships[*partnershipIndex]
return prts.GetProfile(buyingIndex, strategyIndex)
@@ -262,6 +319,9 @@ func (ri *ResourceInstance[T]) GetProfile(peerID string, partnershipIndex *int,
}
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
if ri.IsPeerless() {
return []pricing.PricingProfileITF{pricing.GetDefaultPricingProfile()}
}
pricings := []pricing.PricingProfileITF{}
for _, p := range ri.Partnerships {
pricings = append(pricings, p.GetPricingsProfiles(peerID, groups)...)
@@ -277,6 +337,10 @@ func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []strin
}
func (ri *ResourceInstance[T]) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) {
// Structurally peerless: universally accessible — wildcard on all peers.
if ri.IsPeerless() {
return []ResourcePartnerITF{}, []map[string][]string{{"*": {"*"}}}
}
groups := []map[string][]string{}
partners := []ResourcePartnerITF{}
for _, p := range ri.Partnerships {
@@ -325,11 +389,17 @@ type ResourcePartnerShip[T pricing.PricingProfileITF] struct {
}
func (ri *ResourcePartnerShip[T]) FilterPartnership(peerID string) {
if ri.PeerGroups[peerID] == nil {
ri.PeerGroups = map[string][]string{}
} else {
if ri.PeerGroups["*"] == nil && ri.PeerGroups[peerID] == nil {
ri.PeerGroups = map[string][]string{
peerID: ri.PeerGroups[peerID],
"*": {"*"},
}
} else {
ri.PeerGroups = map[string][]string{}
if ri.PeerGroups["*"] != nil {
ri.PeerGroups["*"] = ri.PeerGroups["*"]
}
if ri.PeerGroups[peerID] != nil {
ri.PeerGroups[peerID] = ri.PeerGroups[peerID]
}
}
}
@@ -355,7 +425,15 @@ Une bill (facture) représente alors... l'emission d'une facture à un instant T
*/
func (ri *ResourcePartnerShip[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
profiles := []pricing.PricingProfileITF{}
if ri.PeerGroups[peerID] == nil {
if ri.PeerGroups["*"] == nil && ri.PeerGroups[peerID] == nil {
return profiles
}
if ri.PeerGroups["*"] != nil {
for _, ri := range ri.PricingProfiles {
for _, i := range ri {
profiles = append(profiles, i)
}
}
return profiles
}
for _, p := range ri.PeerGroups[peerID] {
@@ -387,6 +465,7 @@ func (rp *ResourcePartnerShip[T]) GetPeerGroups() map[string][]string {
return rp.PeerGroups
}
return map[string][]string{
"*": {"*"},
pp.GetID(): {"*"},
}
}
@@ -432,6 +511,12 @@ func ToResource(
return nil, err
}
return &data, nil
case tools.SERVICE_RESOURCE.EnumIndex():
var data ServiceResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
}
return nil, errors.New("can't found any data resources matching")
}

View File

@@ -12,15 +12,14 @@ import (
type ResourceMongoAccessor[T ResourceInterface] struct {
utils.AbstractAccessor[ResourceInterface] // AbstractAccessor contains the basic fields of an accessor (model, caller)
generateData func() utils.DBObject
}
// New creates a new instance of the computeMongoAccessor
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest, g func() utils.DBObject) *ResourceMongoAccessor[T] {
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] {
if !slices.Contains([]tools.DataType{
tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE,
tools.PROCESSING_RESOURCE, tools.WORKFLOW_RESOURCE,
tools.DATA_RESOURCE, tools.NATIVE_TOOL,
tools.PROCESSING_RESOURCE, tools.SERVICE_RESOURCE,
tools.WORKFLOW_RESOURCE, tools.DATA_RESOURCE, tools.NATIVE_TOOL,
}, t) {
return nil
}
@@ -37,6 +36,8 @@ func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIReques
return &StorageResource{}
case tools.PROCESSING_RESOURCE:
return &ProcessingResource{}
case tools.SERVICE_RESOURCE:
return &ServiceResource{}
case tools.WORKFLOW_RESOURCE:
return &WorkflowResource{}
case tools.DATA_RESOURCE:
@@ -47,7 +48,6 @@ func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIReques
return nil
},
},
generateData: g,
}
}
@@ -55,6 +55,16 @@ func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIReques
* Nothing special here, just the basic CRUD operations
*/
func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, error) {
data, code, err := dca.AbstractAccessor.LoadOne(id)
if err == nil {
data.(T).VerifyBuy()
data.(T).SetAllowedInstances(dca.Request)
return data, code, err
}
return data, code, err
}
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE {
return nil, 404, errors.New("can't update a non existing computing units resource not reported onto compute units catalog")
@@ -80,26 +90,29 @@ func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObjec
return dca.StoreOne(data)
}
func (wfa *ResourceMongoAccessor[T]) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[T](wfa.GetExec(isDraft), isDraft, wfa)
func (wfa *ResourceMongoAccessor[T]) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[T](wfa.GetExec(isDraft), isDraft, wfa, offset, limit)
}
func (wfa *ResourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
func (wfa *ResourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
if filters == nil && search == "*" {
return utils.GenericLoadAll[T](func(d utils.DBObject) utils.ShallowDBObject {
d.(T).VerifyBuy()
d.(T).SetAllowedInstances(wfa.Request)
return d
}, isDraft, wfa)
}, isDraft, wfa, offset, limit)
}
return utils.GenericSearch[T](filters, search, wfa.GetObjectFilters(search),
func(d utils.DBObject) utils.ShallowDBObject {
d.(T).VerifyBuy()
d.(T).SetAllowedInstances(wfa.Request)
return d
}, isDraft, wfa)
}, isDraft, wfa, offset, limit)
}
func (a *ResourceMongoAccessor[T]) GetExec(isDraft bool) func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
d.(T).VerifyBuy()
d.(T).SetAllowedInstances(a.Request)
return d
}
@@ -108,12 +121,11 @@ func (a *ResourceMongoAccessor[T]) GetExec(isDraft bool) func(utils.DBObject) ut
func (abs *ResourceMongoAccessor[T]) GetObjectFilters(search string) *dbs.Filters {
return &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}

209
models/resources/service.go Executable file
View File

@@ -0,0 +1,209 @@
package resources
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
)
type ServiceMode int
const (
DEPLOYMENT ServiceMode = iota // deploy the service, pay for uptime — duration unbounded
HOSTED // use an existing service, pay per call — duration per request
)
func (m ServiceMode) String() string {
return [...]string{"DEPLOYMENT", "HOSTED"}[m]
}
type ServiceProtocol int
const (
HTTP ServiceProtocol = iota
GRPC
WEBSOCKET
TCP
)
func (p ServiceProtocol) String() string {
return [...]string{"HTTP", "GRPC", "WEBSOCKET", "TCP"}[p]
}
type ServiceUsage struct {
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"`
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"`
RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"`
StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"`
Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"`
ScalingModel string `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"`
}
// ServiceResourceAccess describes how to reach the service once running.
// Populated for HOSTED instances (endpoint already known) and as a template for DEPLOYMENT.
type ServiceResourceAccess struct {
Container *models.Container `json:"container,omitempty" bson:"container,omitempty"`
Protocol ServiceProtocol `json:"protocol" bson:"protocol" default:"0"`
EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"`
HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"`
}
type ServiceResource struct {
AbstractInstanciatedResource[*ServiceInstance]
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"`
Usage *ServiceUsage `bson:"usage,omitempty" json:"usage,omitempty"`
OpenSource bool `json:"open_source" bson:"open_source" default:"false"`
License string `json:"license,omitempty" bson:"license,omitempty"`
Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"`
}
func (r *ServiceResource) GetType() string {
return tools.SERVICE_RESOURCE.String()
}
func (d *ServiceResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*ServiceResource](tools.SERVICE_RESOURCE, request)
}
func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.SERVICE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Service")
}
p, err := ConvertToPricedResource[*ServiceResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource[*ServiceResourcePricingProfile])
return &PricedServiceResource{PricedResource: *priced}, nil
}
type ServiceInstance struct {
ResourceInstance[*ServiceResourcePartnership]
Mode ServiceMode `json:"mode" bson:"mode" default:"0"`
Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"`
}
func (ri *ServiceInstance) IsPeerless() bool { return false }
func NewServiceInstance(name string, peerID string) ResourceInstanceITF {
return &ServiceInstance{
ResourceInstance: ResourceInstance[*ServiceResourcePartnership]{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: name,
},
},
}
}
type ServiceResourcePartnership struct {
ResourcePartnerShip[*ServiceResourcePricingProfile]
}
// ServiceResourcePricingProfile handles both service modes:
// - DEPLOYMENT: uptime billing via ExploitPricingProfile (pay while service is up)
// - HOSTED: per-call billing via AccessPricingProfile (pay per request)
type ServiceResourcePricingProfile struct {
Mode ServiceMode `json:"mode" bson:"mode"`
UptimePricing *pricing.ExploitPricingProfile[pricing.TimePricingStrategy] `json:"uptime_pricing,omitempty" bson:"uptime_pricing,omitempty"`
AccessPricing *pricing.AccessPricingProfile[pricing.TimePricingStrategy] `json:"access_pricing,omitempty" bson:"access_pricing,omitempty"`
}
func (p *ServiceResourcePricingProfile) ensure() {
if p.UptimePricing == nil {
p.UptimePricing = &pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{}
}
if p.AccessPricing == nil {
p.AccessPricing = &pricing.AccessPricingProfile[pricing.TimePricingStrategy]{}
}
}
func (p *ServiceResourcePricingProfile) IsPurchasable() bool {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.IsPurchasable()
}
return p.AccessPricing.IsPurchasable()
}
func (p *ServiceResourcePricingProfile) IsBooked() bool {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.IsBooked()
}
return p.AccessPricing.IsBooked()
}
func (p *ServiceResourcePricingProfile) GetPurchase() pricing.BuyingStrategy {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.GetPurchase()
}
return p.AccessPricing.GetPurchase()
}
func (p *ServiceResourcePricingProfile) GetOverrideStrategyValue() int {
return -1
}
func (p *ServiceResourcePricingProfile) GetPriceHT(quantity float64, val float64, start time.Time, end time.Time, variations []*pricing.PricingVariation, params ...string) (float64, error) {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.GetPriceHT(quantity, val, start, end, variations, params...)
}
return p.AccessPricing.GetPriceHT(quantity, val, start, end, variations, params...)
}
type PricedServiceResource struct {
PricedResource[*ServiceResourcePricingProfile]
}
func (r *PricedServiceResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &ServiceResourcePricingProfile{}
}
}
func (r *PricedServiceResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedServiceResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedServiceResource) GetType() tools.DataType {
return tools.SERVICE_RESOURCE
}
func (r *PricedServiceResource) GetPriceHT() (float64, error) {
r.ensurePricing()
return r.PricedResource.GetPriceHT()
}
// GetExplicitDurationInS returns -1 for DEPLOYMENT (unbounded uptime).
// For HOSTED, returns the actual call window duration.
func (a *PricedServiceResource) GetExplicitDurationInS() float64 {
a.ensurePricing()
if a.SelectedPricing.Mode == DEPLOYMENT {
return -1
}
if a.BookingConfiguration == nil {
a.BookingConfiguration = &BookingConfiguration{}
}
if a.BookingConfiguration.ExplicitBookingDurationS != 0 {
return a.BookingConfiguration.ExplicitBookingDurationS
}
if a.BookingConfiguration.UsageStart == nil || a.BookingConfiguration.UsageEnd == nil {
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
}

View File

@@ -23,7 +23,7 @@ type StorageResource struct {
}
func (d *StorageResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*StorageResource](tools.STORAGE_RESOURCE, request, func() utils.DBObject { return &StorageResource{} }) // Create a new instance of the accessor
return NewAccessor[*StorageResource](tools.STORAGE_RESOURCE, request) // Create a new instance of the accessor
}
func (r *StorageResource) GetType() string {
@@ -44,6 +44,15 @@ func (abs *StorageResource) ConvertToPricedResource(t tools.DataType, selectedIn
}, nil
}
func (ri *StorageResource) StoreDraftDefault() {
ri.AbstractObject.StoreDraftDefault()
ri.Env = append(ri.Env, models.Param{
Attr: "source",
Value: "[resource]instance.source",
Readonly: true,
})
}
type StorageResourceInstance struct {
ResourceInstance[*StorageResourcePartnership]
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
@@ -57,6 +66,10 @@ type StorageResourceInstance struct {
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
}
// IsPeerless is always false for storage instances: a storage resource is
// infrastructure owned by a peer and can never be declared peerless.
func (ri *StorageResourceInstance) IsPeerless() bool { return false }
func NewStorageResourceInstance(name string, peerID string) ResourceInstanceITF {
return &StorageResourceInstance{
ResourceInstance: ResourceInstance[*StorageResourcePartnership]{
@@ -68,30 +81,6 @@ func NewStorageResourceInstance(name string, peerID string) ResourceInstanceITF
}
}
func (ri *StorageResourceInstance) ClearEnv() {
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *StorageResourceInstance) StoreDraftDefault() {
found := false
for _, p := range ri.ResourceInstance.Env {
if p.Attr == "source" {
found = true
break
}
}
if !found {
ri.ResourceInstance.Env = append(ri.ResourceInstance.Env, models.Param{
Attr: "source",
Value: ri.Source,
Readonly: true,
})
}
ri.ResourceInstance.StoreDraftDefault()
}
type StorageResourcePartnership struct {
ResourcePartnerShip[*StorageResourcePricingProfile]
MaxSizeGBAllowed float64 `json:"allowed_gb,omitempty" bson:"allowed_gb,omitempty"`
@@ -117,7 +106,7 @@ func (t PrivilegeStoragePricingStrategy) String() string {
type StorageResourcePricingStrategy int
const (
PER_DATA_STORED StorageResourcePricingStrategy = iota + 6
PER_DATA_STORED StorageResourcePricingStrategy = iota + 7
PER_TB_STORED
PER_GB_STORED
PER_MB_STORED

View File

@@ -4,7 +4,6 @@ import (
"testing"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -34,23 +33,6 @@ func TestDataResource_ConvertToPricedResource(t *testing.T) {
assert.Nil(t, nilRes)
}
func TestDataInstance_StoreDraftDefault(t *testing.T) {
di := &resources.DataInstance{
Source: "test-src",
ResourceInstance: resources.ResourceInstance[*resources.DataResourcePartnership]{
Env: []models.Param{},
},
}
di.StoreDraftDefault()
assert.Len(t, di.ResourceInstance.Env, 1)
assert.Equal(t, "source", di.ResourceInstance.Env[0].Attr)
assert.Equal(t, "test-src", di.ResourceInstance.Env[0].Value)
// Call again, should not duplicate
di.StoreDraftDefault()
assert.Len(t, di.ResourceInstance.Env, 1)
}
func TestDataResourcePricingStrategy_GetQuantity(t *testing.T) {
tests := []struct {
strategy resources.DataResourcePricingStrategy

View File

@@ -30,13 +30,6 @@ func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
input PricedProcessingResource
expected float64
}{
{
name: "Service without explicit duration",
input: PricedProcessingResource{
IsService: true,
},
expected: -1,
},
{
name: "Nil start time, non-service",
input: PricedProcessingResource{

View File

@@ -114,8 +114,6 @@ func (f *FakeResource) ConvertToPricedResource(t tools.DataType, a *int, b *int,
func (f *FakeResource) VerifyAuth(string, *tools.APIRequest) bool { return true }
func TestNewAccessor_ReturnsValid(t *testing.T) {
acc := resources.NewAccessor[*FakeResource](tools.COMPUTE_RESOURCE, &tools.APIRequest{}, func() utils.DBObject {
return &FakeResource{}
})
acc := resources.NewAccessor[*FakeResource](tools.COMPUTE_RESOURCE, &tools.APIRequest{})
assert.NotNil(t, acc)
}

View File

@@ -3,7 +3,6 @@ package resources_test
import (
"testing"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/stretchr/testify/assert"
@@ -37,36 +36,6 @@ func TestStorageResource_ConvertToPricedResource_InvalidType(t *testing.T) {
assert.Nil(t, priced)
}
func TestStorageResourceInstance_ClearEnv(t *testing.T) {
inst := &resources.StorageResourceInstance{
ResourceInstance: resources.ResourceInstance[*resources.StorageResourcePartnership]{
Env: []models.Param{{Attr: "A"}},
Inputs: []models.Param{{Attr: "B"}},
Outputs: []models.Param{{Attr: "C"}},
},
}
inst.ClearEnv()
assert.Empty(t, inst.Env)
assert.Empty(t, inst.Inputs)
assert.Empty(t, inst.Outputs)
}
func TestStorageResourceInstance_StoreDraftDefault(t *testing.T) {
inst := &resources.StorageResourceInstance{
Source: "my-source",
ResourceInstance: resources.ResourceInstance[*resources.StorageResourcePartnership]{
Env: []models.Param{},
},
}
inst.StoreDraftDefault()
assert.Len(t, inst.Env, 1)
assert.Equal(t, "source", inst.Env[0].Attr)
assert.Equal(t, "my-source", inst.Env[0].Value)
assert.True(t, inst.Env[0].Readonly)
}
func TestStorageResourcePricingStrategy_GetQuantity(t *testing.T) {
tests := []struct {
strategy resources.StorageResourcePricingStrategy

View File

@@ -16,7 +16,7 @@ type WorkflowResource struct {
}
func (d *WorkflowResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*WorkflowResource](tools.WORKFLOW_RESOURCE, request, func() utils.DBObject { return &WorkflowResource{} })
return NewAccessor[*WorkflowResource](tools.WORKFLOW_RESOURCE, request)
}
func (r *WorkflowResource) AddInstances(instance ResourceInstanceITF) {

View File

@@ -31,6 +31,7 @@ const (
*/
type AbstractObject struct {
UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"`
NotInCatalog bool `json:"not_in_catalog" bson:"not_in_catalog" default:"false"`
Name string `json:"name,omitempty" bson:"name,omitempty" validate:"required"`
IsDraft bool `json:"is_draft" bson:"is_draft" default:"false"`
CreatorID string `json:"creator_id,omitempty" bson:"creator_id,omitempty"`
@@ -43,9 +44,29 @@ type AbstractObject struct {
Signature []byte `bson:"signature,omitempty" json:"signature,omitempty"`
}
func (ri *AbstractObject) Extend(typ ...string) map[string][]tools.DataType {
dt := map[string][]tools.DataType{}
for _, t := range typ {
switch t {
case "creator", "user_creator", "user_updater":
if _, ok := dt[t]; !ok {
dt[t] = []tools.DataType{}
}
dt[t] = append(dt[t], tools.PEER)
}
}
return dt
}
func (ri *AbstractObject) GetAccessor(request *tools.APIRequest) Accessor {
return nil
}
func (r *AbstractObject) SetNotInCatalog(ok bool) {
r.NotInCatalog = ok
}
func (r *AbstractObject) IsNotInCatalog() bool {
return r.NotInCatalog
}
func (r *AbstractObject) Unsign() {
r.Signature = nil
@@ -259,12 +280,12 @@ func (a *AbstractAccessor[T]) LoadOne(id string) (DBObject, int, error) {
}, a)
}
func (a *AbstractAccessor[T]) LoadAll(isDraft bool) ([]ShallowDBObject, int, error) {
return GenericLoadAll[T](a.GetExec(isDraft), isDraft, a)
func (a *AbstractAccessor[T]) LoadAll(isDraft bool, offset int64, limit int64) ([]ShallowDBObject, int, error) {
return GenericLoadAll[T](a.GetExec(isDraft), isDraft, a, offset, limit)
}
func (a *AbstractAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]ShallowDBObject, int, error) {
return GenericSearch[T](filters, search, a.New().GetObjectFilters(search), a.GetExec(isDraft), isDraft, a)
func (a *AbstractAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]ShallowDBObject, int, error) {
return GenericSearch[T](filters, search, a.New().GetObjectFilters(search), a.GetExec(isDraft), isDraft, a, offset, limit)
}
func (a *AbstractAccessor[T]) GetExec(isDraft bool) func(DBObject) ShallowDBObject {

View File

@@ -0,0 +1,60 @@
package utils
import (
"sync"
"cloud.o-forge.io/core/oc-lib/tools"
)
// ChangeEvent is fired whenever a DB object is created, updated or deleted
// within this process. Deleted=true means the object was removed; Object is
// the last known snapshot before deletion.
type ChangeEvent struct {
DataType tools.DataType
ID string
Object ShallowDBObject // nil only when the load after the write failed
Deleted bool
}
var (
changeBusMu sync.RWMutex
changeBus = map[tools.DataType][]chan ChangeEvent{}
)
// SubscribeChanges returns a channel that receives ChangeEvents for dt
// whenever an object of that type is written or deleted in this process.
// Call the returned cancel function to unsubscribe; after that the channel
// will no longer receive events (it is not closed — use a context to stop
// reading).
func SubscribeChanges(dt tools.DataType) (<-chan ChangeEvent, func()) {
ch := make(chan ChangeEvent, 32)
changeBusMu.Lock()
changeBus[dt] = append(changeBus[dt], ch)
changeBusMu.Unlock()
return ch, func() {
changeBusMu.Lock()
subs := changeBus[dt]
for i, c := range subs {
if c == ch {
changeBus[dt] = append(subs[:i], subs[i+1:]...)
break
}
}
changeBusMu.Unlock()
}
}
// NotifyChange broadcasts a ChangeEvent to all current subscribers for dt.
// Non-blocking: events are dropped for subscribers whose buffer is full.
func NotifyChange(dt tools.DataType, id string, obj ShallowDBObject, deleted bool) {
changeBusMu.RLock()
subs := changeBus[dt]
changeBusMu.RUnlock()
evt := ChangeEvent{DataType: dt, ID: id, Object: obj, Deleted: deleted}
for _, ch := range subs {
select {
case ch <- evt:
default:
}
}
}

View File

@@ -17,6 +17,9 @@ type Owner struct {
}
func VerifyAccess(a Accessor, id string) error {
if a == nil {
return errors.New("no accessor to verify access")
}
data, _, err := a.LoadOne(id)
if err != nil {
return err
@@ -51,7 +54,7 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
if a.ShouldVerifyAuth() && !data.VerifyAuth("store", a.GetRequest()) {
return nil, 403, errors.New("you are not allowed to access : " + a.GetType().String())
}
if cursor, _, _ := a.Search(&f, "", data.IsDrafted()); len(cursor) > 0 {
if cursor, _, _ := a.Search(&f, "", data.IsDrafted(), 0, 10); len(cursor) > 0 {
return nil, 409, errors.New(a.GetType().String() + " with name " + data.GetName() + " already exists")
}
err := validate.Struct(data)
@@ -63,7 +66,11 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error())
return nil, code, err
}
return a.LoadOne(id)
result, rcode, rerr := a.LoadOne(id)
if rerr == nil && result != nil {
go NotifyChange(a.GetType(), result.GetID(), result, false)
}
return result, rcode, rerr
}
// GenericLoadOne loads one object from the database (generic)
@@ -75,20 +82,22 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
if res == nil {
return res, code, errors.New("not found")
}
return GenericDelete(res, a)
}
func GenericDelete(res DBObject, a Accessor) (DBObject, int, error) {
if !res.CanDelete() {
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
}
if err != nil {
return nil, code, err
}
if a.ShouldVerifyAuth() && !res.VerifyAuth("delete", a.GetRequest()) {
return nil, 403, errors.New("you are not allowed to access " + a.GetType().String())
}
_, code, err = mongo.MONGOService.DeleteOne(id, a.GetType().String())
_, code, err := mongo.MONGOService.DeleteOne(res.GetID(), a.GetType().String())
if err != nil {
a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error())
a.GetLogger().Error().Msg("Could not delete " + res.GetID() + " to db. Error: " + err.Error())
return nil, code, err
}
go NotifyChange(a.GetType(), res.GetID(), res, true)
return res, 200, nil
}
@@ -117,17 +126,26 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
}
loaded := r.Serialize(r) // get the loaded object
for k, v := range change { // apply the changes, with a flatten method
loaded[k] = v
}
return r, loaded, 200, nil
newObj := a.NewObj()
b, err = json.Marshal(loaded)
if err != nil {
return nil, loaded, 400, nil
}
err = json.Unmarshal(b, newObj)
if err != nil {
return nil, loaded, 400, nil
}
return newObj, loaded, 200, nil
}
// GenericLoadOne loads one object from the database (generic)
// json expected in entry is a flatted object no need to respect the inheritance hierarchy
func GenericUpdateOne(change map[string]interface{}, id string, a Accessor) (DBObject, int, error) {
obj, loaded, c, err := ModelGenericUpdateOne(change, id, a)
if err != nil {
return nil, c, err
}
@@ -136,7 +154,11 @@ func GenericUpdateOne(change map[string]interface{}, id string, a Accessor) (DBO
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
return nil, code, err
}
return a.LoadOne(id)
result, rcode, rerr := a.LoadOne(id)
if rerr == nil && result != nil {
go NotifyChange(a.GetType(), result.GetID(), result, false)
}
return result, rcode, rerr
}
func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) {
@@ -147,7 +169,6 @@ func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, i
if err = res_mongo.Decode(data); err != nil {
return nil, 400, err
}
if a.ShouldVerifyAuth() && !data.VerifyAuth("get", a.GetRequest()) {
return nil, 403, errors.New("you are not allowed to access :" + a.GetType().String())
}
@@ -172,17 +193,27 @@ func genericLoadAll[T DBObject](res *mgb.Cursor, code int, err error, onlyDraft
return objs, 200, nil
}
func GenericLoadAll[T DBObject](f func(DBObject) ShallowDBObject, onlyDraft bool, wfa Accessor) ([]ShallowDBObject, int, error) {
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType().String())
func GenericLoadAll[T DBObject](f func(DBObject) ShallowDBObject, onlyDraft bool, wfa Accessor, opts ...int64) ([]ShallowDBObject, int, error) {
offset := int64(0)
limit := int64(0)
if len(opts) > 1 {
offset = opts[0]
}
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType().String(), offset, limit)
return genericLoadAll[T](res_mongo, code, err, onlyDraft, f, wfa)
}
func GenericSearch[T DBObject](filters *dbs.Filters, search string, defaultFilters *dbs.Filters,
f func(DBObject) ShallowDBObject, onlyDraft bool, wfa Accessor) ([]ShallowDBObject, int, error) {
f func(DBObject) ShallowDBObject, onlyDraft bool, wfa Accessor, opts ...int64) ([]ShallowDBObject, int, error) {
if filters == nil && search != "" {
filters = defaultFilters
}
res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType().String())
offset := int64(0)
limit := int64(0)
if len(opts) > 1 {
offset = opts[0]
}
res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType().String(), offset, limit)
return genericLoadAll[T](res_mongo, code, err, onlyDraft, f, wfa)
}
@@ -194,7 +225,11 @@ func GenericRawUpdateOne(set DBObject, id string, a Accessor) (DBObject, int, er
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
return nil, code, err
}
return a.LoadOne(id)
result, rcode, rerr := a.LoadOne(id)
if rerr == nil && result != nil {
go NotifyChange(a.GetType(), result.GetID(), result, false)
}
return result, rcode, rerr
}
func GetMySelf(wfa Accessor) (ShallowDBObject, error) {
@@ -202,7 +237,7 @@ func GetMySelf(wfa Accessor) (ShallowDBObject, error) {
And: map[string][]dbs.Filter{
"relation": {{Operator: dbs.EQUAL.String(), Value: 1}},
},
}, "", false)
}, "", false, 0, 1)
if len(datas) > 0 && datas[0] != nil {
return datas[0], nil
}

View File

@@ -8,6 +8,7 @@ import (
// ShallowDBObject is an interface that defines the basic methods shallowed version of a DBObject
type ShallowDBObject interface {
DBObject
GenerateID()
GetID() string
GetName() string
@@ -18,6 +19,9 @@ type ShallowDBObject interface {
// DBObject is an interface that defines the basic methods for a DBObject
type DBObject interface {
GenerateID()
Extend(typ ...string) map[string][]tools.DataType
SetNotInCatalog(bool)
IsNotInCatalog() bool
SetID(id string)
GetID() string
GetName() string
@@ -53,8 +57,8 @@ type Accessor interface {
DeleteOne(id string) (DBObject, int, error)
CopyOne(data DBObject) (DBObject, int, error)
StoreOne(data DBObject) (DBObject, int, error)
LoadAll(isDraft bool) ([]ShallowDBObject, int, error)
LoadAll(isDraft bool, offset int64, limit int64) ([]ShallowDBObject, int, error)
UpdateOne(set map[string]interface{}, id string) (DBObject, int, error)
Search(filters *dbs.Filters, search string, isDraft bool) ([]ShallowDBObject, int, error)
Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]ShallowDBObject, int, error)
GetExec(isDraft bool) func(DBObject) ShallowDBObject
}

View File

@@ -45,6 +45,10 @@ func (wf *Graph) IsProcessing(item GraphItem) bool {
return item.Processing != nil
}
func (wf *Graph) IsService(item GraphItem) bool {
return item.Service != nil
}
func (wf *Graph) IsNativeTool(item GraphItem) bool {
return item.NativeTool != nil
}
@@ -151,6 +155,8 @@ func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterf
return tools.PROCESSING_RESOURCE, item.Processing
} else if item.Storage != nil {
return tools.STORAGE_RESOURCE, item.Storage
} else if item.Service != nil {
return tools.SERVICE_RESOURCE, item.Service
}
}
return tools.INVALID, nil

View File

@@ -25,6 +25,10 @@ func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface)
return tools.PROCESSING_RESOURCE, g.Processing
} else if g.Storage != nil {
return tools.STORAGE_RESOURCE, g.Storage
} else if g.NativeTool != nil {
return tools.NATIVE_TOOL, g.NativeTool
} else if g.Service != nil {
return tools.SERVICE_RESOURCE, g.Service
}
return tools.INVALID, nil
}
@@ -35,4 +39,5 @@ func (g *GraphItem) Clear() {
g.Workflow = nil
g.Processing = nil
g.Storage = nil
g.Service = nil
}

View File

@@ -44,11 +44,14 @@ type Workflow struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow
ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set
// Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow // AbstractWorkflow contains the basic fields of a workflow
Env []models.Param `json:"env,omitempty" bson:"env,omitempty"`
Inputs []models.Param `json:"inputs,omitempty" bson:"inputs,omitempty"`
Env map[string][]models.Param `json:"env" bson:"env"`
Inputs map[string][]models.Param `json:"inputs" bson:"inputs"`
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
}
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -88,6 +91,11 @@ func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface
itf = append(itf, d)
}
return itf
case tools.SERVICE_RESOURCE:
for _, d := range d.ServiceResources {
itf = append(itf, d)
}
return itf
}
return itf
}
@@ -226,6 +234,7 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
}
d.generateResource(d.GetResources(tools.DATA_RESOURCE), request)
d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
@@ -624,7 +633,8 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
var err error
// 2. Plan processings first so we can derive the total workflow duration.
// 2. Plan processings and services first so we can derive the total workflow duration.
// Services in DEPLOYMENT mode return duration=-1 (open-ended); HOSTED mode returns a bounded call window.
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
@@ -641,6 +651,24 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
if err != nil {
return false, 0, priceds, nil, err
}
if _, priceds, err = plan[*resources.ServiceResource](tools.SERVICE_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsService,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
*instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()),
bookingMode, request)
if err != nil {
return start, 0, err
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
if duration < 0 {
return nil, nil // DEPLOYMENT mode: open-ended
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
@@ -790,6 +818,7 @@ func (w *Workflow) GetItemsByResources() map[tools.DataType]map[string][]string
tools.DATA_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsData) },
tools.COMPUTE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsCompute) },
tools.PROCESSING_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsProcessing) },
tools.SERVICE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsService) },
tools.WORKFLOW_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsWorkflow) },
}

View File

@@ -150,7 +150,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: workflow.Name + "_workspace"}},
},
}
resource, _, err := a.workspaceAccessor.Search(filters, "", workflow.IsDraft)
resource, _, err := a.workspaceAccessor.Search(filters, "", workflow.IsDraft, 0, 10)
if delete { // if delete is set to true, delete the workspace
for _, r := range resource {
a.workspaceAccessor.DeleteOne(r.GetID())
@@ -192,9 +192,9 @@ func (a *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error)
}, a)
}
func (a *workflowMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
func (a *workflowMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*Workflow](filters, search, a.New().GetObjectFilters(search),
func(d utils.DBObject) utils.ShallowDBObject { return a.verifyResource(d) }, isDraft, a)
func(d utils.DBObject) utils.ShallowDBObject { return a.verifyResource(d) }, isDraft, a, offset, limit)
}
func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObject {
@@ -210,15 +210,19 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
var access utils.Accessor
switch t {
case tools.COMPUTE_RESOURCE:
access = resources.NewAccessor[*resources.ComputeResource](t, a.GetRequest(), func() utils.DBObject { return &resources.ComputeResource{} })
access = resources.NewAccessor[*resources.ComputeResource](t, a.GetRequest())
case tools.PROCESSING_RESOURCE:
access = resources.NewAccessor[*resources.ProcessingResource](t, a.GetRequest(), func() utils.DBObject { return &resources.ProcessingResource{} })
access = resources.NewAccessor[*resources.ProcessingResource](t, a.GetRequest())
case tools.STORAGE_RESOURCE:
access = resources.NewAccessor[*resources.StorageResource](t, a.GetRequest(), func() utils.DBObject { return &resources.StorageResource{} })
access = resources.NewAccessor[*resources.StorageResource](t, a.GetRequest())
case tools.WORKFLOW_RESOURCE:
access = resources.NewAccessor[*resources.WorkflowResource](t, a.GetRequest(), func() utils.DBObject { return &resources.WorkflowResource{} })
access = resources.NewAccessor[*resources.WorkflowResource](t, a.GetRequest())
case tools.DATA_RESOURCE:
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} })
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest())
case tools.NATIVE_TOOL:
access = resources.NewAccessor[*resources.NativeTool](t, a.GetRequest())
case tools.SERVICE_RESOURCE:
access = resources.NewAccessor[*resources.ServiceResource](t, a.GetRequest())
default:
wf.Graph.Clear(resource.GetID())
}

View File

@@ -0,0 +1,168 @@
package workflow_execution
import (
"slices"
"time"
workflowgraph "cloud.o-forge.io/core/oc-lib/models/workflow/graph"
)
// ExecutionStepState is the runtime state of a single step in the execution graph.
type ExecutionStepState string
const (
StepWaiting ExecutionStepState = "waiting"
StepRunning ExecutionStepState = "running"
StepSuccess ExecutionStepState = "success"
StepFailure ExecutionStepState = "failure"
)
// ExecutionGraphItem is the summarized view of one node in the workflow execution graph.
//
// - Name : human-readable label (resource name or item ID as fallback)
// - StartDate : set when the step transitions to StepRunning
// - EndDate : set when the step transitions to StepSuccess or StepFailure
// - State : current lifecycle state of the step
// - Deps : itemIDs that must reach StepSuccess before this step can start
// - WhenRunning : itemIDs (resources) that become active while this step is running
// (e.g. the compute node executing it, the storage it reads/writes)
type ExecutionGraphItem struct {
Name string `json:"name" bson:"name"`
StartDate *time.Time `json:"start_date,omitempty" bson:"start_date,omitempty"`
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"`
State ExecutionStepState `json:"state" bson:"state"`
Deps []string `json:"deps,omitempty" bson:"deps,omitempty"`
WhenRunning []string `json:"when_running,omitempty" bson:"when_running,omitempty"`
}
// ExecutionGraph is a flat, scheduler-friendly summary of a workflow execution graph.
// The map key is the workflow graph item ID.
type ExecutionGraph map[string]ExecutionGraphItem
// BuildExecutionGraph derives an initial ExecutionGraph (all steps in StepWaiting)
// from a workflow graph. It infers:
// - Deps : predecessor item IDs based on link direction
// - WhenRunning : sibling item IDs connected to a step by a link
// (i.e. resources that are co-active when the step runs)
func BuildExecutionGraph(g *workflowgraph.Graph) ExecutionGraph {
if g == nil {
return ExecutionGraph{}
}
// deps[dst] = list of src item IDs that dst depends on
deps := map[string][]string{}
// whenRunning[id] = list of item IDs active while id is running
whenRunning := map[string][]string{}
for _, link := range g.Links {
src := link.Source.ID
dst := link.Destination.ID
if src == "" || dst == "" {
continue
}
srcItem, srcOk := g.Items[src]
dstItem, dstOk := g.Items[dst]
if !srcOk || !dstOk {
continue
}
// Steps (logical nodes that sequence execution): Data, Processing, Workflow, NativeTool.
// Resources (infrastructure co-active while a step runs): Compute, Storage.
srcIsStep := srcItem.Data != nil || srcItem.Processing != nil || srcItem.Workflow != nil || srcItem.NativeTool != nil
dstIsStep := dstItem.Data != nil || dstItem.Processing != nil || dstItem.Workflow != nil || dstItem.NativeTool != nil
srcIsResource := srcItem.Compute != nil || srcItem.Storage != nil
dstIsResource := dstItem.Compute != nil || dstItem.Storage != nil
switch {
case srcIsStep && dstIsStep:
// Sequential dependency: dst must wait for src to succeed.
deps[dst] = appendUnique(deps[dst], src)
case srcIsStep && dstIsResource:
// src activates dst (compute/storage) while running.
whenRunning[src] = appendUnique(whenRunning[src], dst)
case srcIsResource && dstIsStep:
// dst uses src (compute/storage) while running.
whenRunning[dst] = appendUnique(whenRunning[dst], src)
}
}
eg := ExecutionGraph{}
for id, item := range g.Items {
name := id
_, r := item.GetResource()
if r != nil && r.GetName() != "" {
name = r.GetName()
}
eg[id] = ExecutionGraphItem{
Name: name,
State: StepWaiting,
Deps: deps[id],
WhenRunning: whenRunning[id],
}
}
return eg
}
// MarkRunning transitions the step to StepRunning and records the start time.
// It is a no-op if the step is already beyond StepRunning.
func (eg ExecutionGraph) MarkRunning(itemID string, at time.Time) {
item, ok := eg[itemID]
if !ok || item.State == StepSuccess || item.State == StepFailure {
return
}
item.State = StepRunning
item.StartDate = &at
eg[itemID] = item
}
// MarkDone transitions the step to StepSuccess or StepFailure and records the end time.
func (eg ExecutionGraph) MarkDone(itemID string, success bool, at time.Time) {
item, ok := eg[itemID]
if !ok {
return
}
if success {
item.State = StepSuccess
} else {
item.State = StepFailure
}
item.EndDate = &at
eg[itemID] = item
}
// Depssatisfied returns true when all deps of the given item have reached StepSuccess.
func (eg ExecutionGraph) Depssatisfied(itemID string) bool {
item, ok := eg[itemID]
if !ok {
return false
}
for _, dep := range item.Deps {
depItem, depOk := eg[dep]
if !depOk || depItem.State != StepSuccess {
return false
}
}
return true
}
// ReadyToRun returns the IDs of all steps that are still waiting and whose deps
// are fully satisfied. Useful for the scheduler to decide what to start next.
func (eg ExecutionGraph) ReadyToRun() []string {
ready := []string{}
for id, item := range eg {
if item.State == StepWaiting && eg.Depssatisfied(id) {
ready = append(ready, id)
}
}
return ready
}
func appendUnique(slice []string, val string) []string {
if slices.Contains(slice, val) {
return slice
}
return append(slice, val)
}

View File

@@ -17,6 +17,28 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
// EmbeddedStorageSelection records which storage capability was activated on a
// compute unit graph node, and which pricing options were selected for it.
// Key in WorkflowExecution.SelectedEmbeddedStorages is the compute graph node ID.
// A nil/absent entry means no storage was activated on that compute unit.
type EmbeddedStorageSelection struct {
StorageIndex int `json:"storage_index" bson:"storage_index"` // index in ComputeResourceInstance.AvailableStorages
PartnershipIndex int `json:"partnership_index" bson:"partnership_index"` // index in the storage's partnerships
BuyingIndex int `json:"buying_index" bson:"buying_index"`
StrategyIndex int `json:"strategy_index" bson:"strategy_index"`
}
// BookingState tracks the reservation and completion status of a single booking
// within a workflow execution.
// - IsBooked: true while the resource is actively reserved (set on WORKFLOW_STARTED_EVENT,
// cleared on WORKFLOW_STEP_DONE_EVENT / WORKFLOW_DONE_EVENT).
// - IsDone: true once the booking has been confirmed by the remote peer (CONSIDERS_EVENT)
// or completed (WORKFLOW_STEP_DONE_EVENT / WORKFLOW_DONE_EVENT).
type BookingState struct {
IsBooked bool `json:"is_booked" bson:"is_booked"`
IsDone bool `json:"is_done" bson:"is_done"`
}
/*
* WorkflowExecution is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
@@ -33,13 +55,37 @@ type WorkflowExecution struct {
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
BookingsState map[string]bool `json:"bookings_state" bson:"bookings_state,omitempty"` // WorkflowID is the ID of the workflow
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // WorkflowID is the ID of the workflow
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
// Graph is a lightweight, real-time summary of the workflow execution graph.
// Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event.
// Consumed by oc-front to render the live execution panel via websocket updates.
Graph ExecutionGraph `json:"graph,omitempty" bson:"graph,omitempty"`
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
// SelectedEmbeddedStorages records which storage capability was activated on
// each compute unit graph node (key = compute graph node ID).
// Populated by oc-scheduler, consumed by oc-monitord's argo builder.
SelectedEmbeddedStorages map[string]*EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
}
func (ri *WorkflowExecution) Extend(typ ...string) map[string][]tools.DataType {
ext := ri.AbstractObject.Extend(typ...)
for _, t := range typ {
switch t {
case "workflow":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
}
}
return ext
}
func (r *WorkflowExecution) StoreDraftDefault() {
@@ -78,7 +124,7 @@ func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(ws.ExecDate)},
},
},
}, "", ws.IsDraft)
}, "", ws.IsDraft, 0, 10000)
if code != 200 || err != nil {
return err
}
@@ -189,9 +235,9 @@ func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[t
booking = append(booking, d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
for _, p := range booking {
if d.BookingsState == nil {
d.BookingsState = map[string]bool{}
d.BookingsState = map[string]BookingState{}
}
d.BookingsState[p.GetID()] = false
d.BookingsState[p.GetID()] = BookingState{}
}
return booking
}
@@ -229,10 +275,14 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
var m map[string]interface{}
b, _ := json.Marshal(priced)
json.Unmarshal(b, &m)
name := priced.GetName()
if len(executionsID) > 8 {
name += " " + executionsID[:8]
}
bookingItem := &booking.Booking{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
Name: name + " " + start.Format("2006-01-02 15:04"),
IsDraft: true,
},
PricedItem: m,
@@ -242,6 +292,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
InstanceID: priced.GetInstanceID(),
ResourceType: dt,
DestPeerID: priced.GetCreatorID(),
Peerless: priced.GetCreatorID() == "",
WorkflowID: wfID,
ExecutionID: d.GetID(),
ExpectedStartDate: start,

View File

@@ -49,7 +49,7 @@ func (a *workspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, erro
// UpdateOne updates a workspace in the database, given its ID, it automatically share to peers if the workspace is shared
func (a *workspaceMongoAccessor) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if set["active"] == true { // If the workspace is active, deactivate all the other workspaces
/*if set["active"] == true { // If the workspace is active, deactivate all the other workspaces
res, _, err := a.LoadAll(true)
if err == nil {
for _, r := range res {
@@ -59,7 +59,7 @@ func (a *workspaceMongoAccessor) UpdateOne(set map[string]interface{}, id string
}
}
}
}
}*/
res, code, err := utils.GenericUpdateOne(set, id, a)
if code == 200 && res != nil {
a.share(res.(*Workspace), tools.PUT, a.GetCaller())
@@ -76,7 +76,7 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
},
}
// filters *dbs.Filters, word string, isDraft bool
res, _, err := a.Search(filters, "", true) // Search for the workspace
res, _, err := a.Search(filters, "", true, 0, 10) // Search for the workspace
if err == nil && len(res) > 0 { // If the workspace already exists, return an error
return nil, 409, errors.New("a workspace with the same name already exists")
}
@@ -87,24 +87,24 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
}
func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*Workspace](id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
d.(*Workspace).Fill(a.GetRequest())
return d, 200, nil
}, a)
}
func (a *workspaceMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
}, isDraft, a)
}, isDraft, a, offset, limit)
}
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
}, isDraft, a)
}, isDraft, a, offset, limit)
}
/*

View File

@@ -60,16 +60,16 @@ func (s State) String() string {
type API struct{}
func (a *API) Discovered(infos []*beego.ControllerInfo) {
func (a *API) Discovered(infos []*beego.ControllerInfo, extra ...map[string][]string) {
respondToDiscovery := func(resp NATSResponse) {
var m map[string]interface{}
json.Unmarshal(resp.Payload, &m)
if len(m) == 0 {
a.SubscribeRouter(infos)
a.SubscribeRouter(infos, extra...)
}
}
a.ListenRouter(respondToDiscovery)
a.SubscribeRouter(infos)
a.SubscribeRouter(infos, extra...)
}
// GetState returns the state of the API
@@ -99,11 +99,12 @@ func (a *API) ListenRouter(exec func(msg NATSResponse)) {
})
}
func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
func (a *API) SubscribeRouter(infos []*beego.ControllerInfo, extra ...map[string][]string) {
nats := NewNATSCaller()
appPrefix := "/" + strings.ReplaceAll(config.GetAppName(), "oc-", "")
discovery := map[string][]string{}
for _, info := range infos {
path := strings.ReplaceAll(info.GetPattern(), "/oc/", "/"+strings.ReplaceAll(config.GetAppName(), "oc-", ""))
path := strings.ReplaceAll(info.GetPattern(), "/oc/", appPrefix+"/")
for k, v := range info.GetMethod() {
if discovery[path] == nil {
discovery[path] = []string{}
@@ -115,6 +116,15 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
}
}
}
for _, extraRoutes := range extra {
for rawPath, methods := range extraRoutes {
path := strings.ReplaceAll(rawPath, "/oc/", appPrefix+"/")
if discovery[path] == nil {
discovery[path] = []string{}
}
discovery[path] = append(discovery[path], methods...)
}
}
b, _ := json.Marshal(discovery)
go nats.SetNATSPub(DISCOVERY, NATSResponse{

View File

@@ -32,6 +32,8 @@ const (
BILL
NATIVE_TOOL
EXECUTION_VERIFICATION
ALLOWED_IMAGE
SERVICE_RESOURCE
)
var NOAPI = func() string {
@@ -88,6 +90,8 @@ var InnerDefaultAPI = [...]func() string{
NOAPI,
CATALOGAPI,
SCHEDULERAPI,
DATACENTERAPI,
CATALOGAPI,
}
// Bind the standard data name to the data type
@@ -114,6 +118,8 @@ var Str = [...]string{
"bill",
"native_tool",
"execution_verification",
"allowed_image",
"service_resource",
}
func FromString(comp string) int {
@@ -149,7 +155,7 @@ func DataTypeList() []DataType {
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE,
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL}
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE}
}
type PropalgationMessage struct {
@@ -171,8 +177,14 @@ const (
PB_CONSIDERS
PB_ADMIRALTY_CONFIG
PB_MINIO_CONFIG
PB_PVC_CONFIG
PB_CLOSE_SEARCH
NONE
PB_OBSERVE
PB_OBSERVE_CLOSE
// PB_PROPAGATE is used by oc-discovery to broadcast a peer's online/offline
// state to other oc-discovery nodes in the federation via PROPALGATION_EVENT.
PB_PROPAGATE
)
func GetActionString(ss string) PubSubAction {
@@ -199,14 +211,40 @@ func GetActionString(ss string) PubSubAction {
return PB_MINIO_CONFIG
case "close_search":
return PB_CLOSE_SEARCH
case "observe":
return PB_OBSERVE
case "observe_close":
return PB_OBSERVE_CLOSE
case "propagate":
return PB_PROPAGATE
default:
return NONE
}
}
var path = []string{"search", "search_response", "create", "update", "delete", "planner", "close_planner",
"considers", "admiralty_config", "minio_config", "close_search"}
// path aligns with PubSubAction iota values for String().
var path = []string{
"search", // 0 PB_SEARCH
"search_response", // 1 PB_SEARCH_RESPONSE
"create", // 2 PB_CREATE
"update", // 3 PB_UPDATE
"delete", // 4 PB_DELETE
"planner", // 5 PB_PLANNER
"close_planner", // 6 PB_CLOSE_PLANNER
"considers", // 7 PB_CONSIDERS
"admiralty_config", // 8 PB_ADMIRALTY_CONFIG
"minio_config", // 9 PB_MINIO_CONFIG
"pvc_config", // 10 PB_PVC_CONFIG
"close_search", // 11 PB_CLOSE_SEARCH
"none", // 12 NONE
"observe", // 13 PB_OBSERVE
"observe_close", // 14 PB_OBSERVE_CLOSE
"propagate", // 15 PB_PROPAGATE
}
func (m PubSubAction) String() string {
if int(m) >= len(path) {
return "unknown"
}
return strings.ToUpper(path[m])
}

View File

@@ -14,6 +14,7 @@ import (
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -97,8 +98,8 @@ func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) erro
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Labels: map[string]string{
"multicluster-scheduler": "enabled",
Annotations: map[string]string{
"multicluster.admiralty.io/elect": "",
},
},
}
@@ -199,9 +200,9 @@ func (k *KubernetesService) ProvisionExecutionNamespace(ctx context.Context, ns
}
role := "argo-role"
if err := k.CreateRole(ctx, ns, role,
[][]string{{"coordination.k8s.io"}, {""}, {""}},
[][]string{{"leases"}, {"secrets"}, {"pods"}},
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}},
[][]string{{"coordination.k8s.io"}, {""}, {""}, {"multicluster.admiralty.io"}, {"argoproj.io"}},
[][]string{{"leases"}, {"secrets"}, {"pods"}, {"podchaperons"}, {"workflowtaskresults"}},
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}, {"get", "list", "watch", "create", "update", "patch", "delete"}, {"create", "patch"}},
); err != nil {
return err
}
@@ -598,6 +599,75 @@ func (k *KubernetesService) CreateSecret(context context.Context, minioId string
return nil
}
// CreatePVC creates a static PersistentVolume + PersistentVolumeClaim in the given namespace.
// Static provisioning (no StorageClass) avoids the WaitForFirstConsumer deadlock
// with Admiralty virtual nodes — the PVC binds immediately.
func (k *KubernetesService) CreatePVC(ctx context.Context, name, namespace, storageSize string) error {
storageClassName := ""
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(storageSize),
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
StorageClassName: storageClassName,
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
PersistentVolumeSource: v1.PersistentVolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/var/lib/oc-storage/" + name,
Type: func() *v1.HostPathType { t := v1.HostPathDirectoryOrCreate; return &t }(),
},
},
ClaimRef: &v1.ObjectReference{
Namespace: namespace,
Name: name,
},
},
}
_, err := k.Set.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("CreatePV %s: %w", name, err)
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
StorageClassName: &storageClassName,
VolumeName: name,
Resources: v1.VolumeResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(storageSize),
},
},
},
}
_, err = k.Set.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("CreatePVC %s/%s: %w", namespace, name, err)
}
return nil
}
// DeletePVC deletes a PersistentVolumeClaim and its associated PersistentVolume.
func (k *KubernetesService) DeletePVC(ctx context.Context, name, namespace string) error {
err := k.Set.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("DeletePVC %s/%s: %w", namespace, name, err)
}
err = k.Set.CoreV1().PersistentVolumes().Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("DeletePV %s: %w", name, err)
}
return nil
}
// ============== ADMIRALTY ==============
// Returns a concatenation of the peerId and namespace in order for
// kubernetes ressources to have a unique name, under 63 characters

View File

@@ -29,8 +29,9 @@ type NATSMethod int
var meths = []string{"remove execution", "create execution", "planner execution", "discovery",
"workflow event", "argo kube event", "create resource", "remove resource",
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event",
"considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event",
"peer behavior event", "peer observe response event", "peer observe event",
}
const (
@@ -53,6 +54,7 @@ const (
CONSIDERS_EVENT
ADMIRALTY_CONFIG_EVENT
MINIO_CONFIG_EVENT
PVC_CONFIG_EVENT
// Workflow lifecycle events emitted by oc-monitord.
// oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state.
@@ -60,6 +62,22 @@ const (
WORKFLOW_STARTED_EVENT
WORKFLOW_STEP_DONE_EVENT
WORKFLOW_DONE_EVENT
// PEER_BEHAVIOR_EVENT is emitted by any trusted service (oc-scheduler,
// oc-datacenter, …) when a peer exhibits suspicious or fraudulent behavior.
// oc-discovery consumes it to update the peer's trust score and auto-blacklist
// below threshold.
PEER_BEHAVIOR_EVENT
// PEER_OBSERVE_RESPONSE_EVENT is emitted by oc-discovery each time it receives
// a heartbeat from an observed remote peer. oc-peer listens to this event to
// update the WS connectivity state for its clients.
PEER_OBSERVE_RESPONSE_EVENT
// PEER_OBSERVE_EVENT is emitted by oc-peer to request oc-discovery to start
// or stop observing a remote peer. Payload contains the target peer_id and a
// boolean close flag.
PEER_OBSERVE_EVENT
)
func (n NATSMethod) String() string {
@@ -70,8 +88,9 @@ func (n NATSMethod) String() string {
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}

49
tools/peer_behavior.go Normal file
View File

@@ -0,0 +1,49 @@
package tools
import "time"
// BehaviorSeverity qualifies the gravity of a peer misbehavior.
type BehaviorSeverity int
const (
// BehaviorWarn: minor inconsistency — slight trust penalty.
BehaviorWarn BehaviorSeverity = iota
// BehaviorFraud: deliberate data manipulation (e.g. fake peerless Ref,
// invalid booking) — significant trust penalty.
BehaviorFraud
// BehaviorCritical: severe abuse (secret exfiltration, data corruption,
// system-level attack) — heavy penalty, near-immediate blacklist.
BehaviorCritical
)
// scorePenalties maps each severity to a trust-score deduction (out of 100).
var scorePenalties = map[BehaviorSeverity]float64{
BehaviorWarn: 5,
BehaviorFraud: 20,
BehaviorCritical: 40,
}
// Penalty returns the trust-score deduction for this severity.
func (s BehaviorSeverity) Penalty() float64 {
if p, ok := scorePenalties[s]; ok {
return p
}
return 5
}
// PeerBehaviorReport is the payload carried by PEER_BEHAVIOR_EVENT.
// Any trusted service can emit it; oc-discovery is the sole consumer.
type PeerBehaviorReport struct {
// ReporterApp identifies the emitting service (e.g. "oc-scheduler", "oc-datacenter").
ReporterApp string `json:"reporter_app"`
// TargetPeerID is the MongoDB DID (_id) of the offending peer.
TargetPeerID string `json:"target_peer_id"`
// Severity drives how much the trust score drops.
Severity BehaviorSeverity `json:"severity"`
// Reason is a human-readable description shown in the blacklist warning.
Reason string `json:"reason"`
// Evidence is an optional reference (booking ID, resource Ref, …).
Evidence string `json:"evidence,omitempty"`
// At is the timestamp of the observed misbehavior.
At time.Time `json:"at"`
}

View File

@@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
)
// HTTP Method Enum defines the different methods that can be used to interact with the HTTP server
@@ -57,6 +58,7 @@ type HTTPCallerITF interface {
type HTTPCaller struct {
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
Disabled bool // Disabled flag
Mu sync.RWMutex
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
}
@@ -217,6 +219,8 @@ func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
}
func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
caller.Mu.Lock()
defer caller.Mu.Unlock()
caller.LastResults = make(map[string]interface{})
caller.LastResults["header"] = resp.Header
caller.LastResults["code"] = resp.StatusCode