80 Commits

Author SHA1 Message Date
mr
88d2e52628 Correct 2026-03-20 16:14:07 +01:00
mr
9f861e5b8d Set up 2026-03-20 15:41:33 +01:00
mr
e4506f3b42 longest trace 2026-03-20 15:21:48 +01:00
mr
75d08aae7c time longest 2026-03-20 15:09:52 +01:00
mr
b288085f32 if 100% kick 2026-03-20 14:57:01 +01:00
mr
bd3e81be0c CHECK log 2026-03-20 14:51:08 +01:00
mr
fafa1186c2 out * 1 hour 2026-03-20 14:42:48 +01:00
mr
471eaff94c missing instanceID 2026-03-20 14:38:52 +01:00
mr
c9fcabac6e debug time 2026-03-20 14:32:46 +01:00
mr
478e68e6d4 Workout Time Scheduling 2026-03-20 14:20:26 +01:00
mr
5619010838 correct time loc 2026-03-20 14:01:14 +01:00
mr
f1a9214ac7 Check trigger strange 2026-03-20 13:41:12 +01:00
mr
e6eb516f39 ensurePricing 2026-03-20 13:28:35 +01:00
mr
1508cc3611 PricedItem evolved 2026-03-20 13:07:06 +01:00
mr
2abc035ec0 planner trace 2026-03-20 12:09:28 +01:00
mr
c34b8c6703 correction planner 2026-03-20 11:33:59 +01:00
mr
a62fbc6c7a Workflow lifecycle events + resource instance duration tracking
- Add WorkflowLifecycleEvent + StepMetric to tools/workflow_lifecycle.go
- Add WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT NATS methods
- ResourceInstance.UpdateAverageDuration for AverageDurationS running average
- Support Steps recap in WORKFLOW_DONE_EVENT for catch-up by oc-scheduler/oc-catalog

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 10:30:30 +01:00
mr
6e28dce02c provisionning 2026-03-19 15:52:55 +01:00
mr
fe3b185b60 err trace 2026-03-19 12:05:33 +01:00
mr
6641d38d9d DBAbstract 2026-03-19 11:32:51 +01:00
mr
93ad8db9a8 decoded CA 2026-03-19 11:17:14 +01:00
mr
4eb53917b8 Log 2026-03-19 10:50:00 +01:00
mr
c7884f5cde NewKubernetesService decoded 2026-03-19 09:05:42 +01:00
mr
5fca0480af suppress check error on get 2026-03-19 08:44:25 +01:00
mr
28b5b7d39f Provisionning Ns + TearDown Ns 2026-03-19 08:18:18 +01:00
mr
5b7edb53a9 OcLib 2026-03-19 07:56:47 +01:00
mr
5976795d44 New Channel to Clarify Movement 2026-03-18 15:38:22 +01:00
mr
3d22ff40fb PB -> ADMIRALTY + MINIO 2026-03-18 14:58:21 +01:00
mr
889656a95e argo kube event remains 2026-03-18 14:52:07 +01:00
mr
c66fbc809e argo event 2026-03-18 14:46:35 +01:00
mr
1a37a1b4aa loki adjust 2026-03-18 10:28:31 +01:00
mr
d4ac398cdb plantuml debug 2026-03-18 09:41:09 +01:00
mr
4eb112bee3 Debug 2026-03-18 09:17:22 +01:00
mr
d1214fe622 adjust Export 2026-03-18 09:10:58 +01:00
mr
6a907236fa export 2026-03-18 08:40:39 +01:00
mr
85314baac3 PlantUML doc & Human Readable commentary 2026-03-18 08:30:02 +01:00
mr
cec8033ddc by pass restriction 2026-03-17 16:46:40 +01:00
mr
d0645f5ca7 publishing is only allowed is it can be monitored and be accessible temp disable 2026-03-17 16:42:03 +01:00
mr
c39bc52312 setup draft as live 2026-03-17 16:35:35 +01:00
mr
0a87343e3e Copy 2026-03-17 16:24:42 +01:00
mr
96beaade24 access 2026-03-17 16:15:12 +01:00
mr
5753450965 oclib setup 2026-03-17 16:09:39 +01:00
mr
7f8d697e4c \n replaceAll 2026-03-17 15:49:27 +01:00
mr
94837f8d24 kicks out Required not Required 2026-03-17 15:31:25 +01:00
mr
e758144b46 forgot 2026-03-17 15:25:54 +01:00
mr
72be3118b7 NATSMethod 2026-03-17 14:59:27 +01:00
mr
67778e1e47 err 2026-03-17 14:54:13 +01:00
mr
562dfb18c1 graphItem 2026-03-17 14:37:06 +01:00
mr
2a2dd96870 graphVarName 2026-03-17 14:32:39 +01:00
mr
333476e2c5 Setup 2026-03-17 14:26:11 +01:00
mr
0fd2513278 Setup 2026-03-17 14:17:49 +01:00
mr
e79101f58d oc-lib 2026-03-17 14:03:19 +01:00
mr
b3dbc7687e setup 2026-03-17 13:52:43 +01:00
mr
8fd4f5faef items 2026-03-17 13:36:36 +01:00
mr
f7012e285f setup 2026-03-17 13:29:54 +01:00
mr
088b45b2cf Set up 2026-03-17 13:19:51 +01:00
mr
1ac735cef1 Stop rebuild id 2026-03-17 10:04:40 +01:00
mr
65237f0d1f implement 2026-03-17 09:32:02 +01:00
mr
9b2f945176 forgot about sessionID use ExecutionsID 2026-03-17 09:01:47 +01:00
mr
b110cbc260 error on usage start 2026-03-16 15:59:19 +01:00
mr
a4d81cbb67 sec 2026-03-16 13:16:50 +01:00
mr
9bf2c566e9 test 2026-03-16 12:48:21 +01:00
mr
6d8efd137a After 2026-03-16 12:32:39 +01:00
mr
40a986af41 order per session 2026-03-16 11:47:51 +01:00
mr
4a076ba237 SchedulingSessionID 2026-03-16 11:45:58 +01:00
mr
deb819c5af After check 2026-03-16 11:41:05 +01:00
mr
55a039bd66 follow date 2026-03-16 11:40:00 +01:00
mr
a86e78841b base draft 2026-03-16 10:59:31 +01:00
mr
48f034316b booking strange state 2026-03-16 10:49:39 +01:00
mr
9e5102893f not found del 2026-03-16 09:28:48 +01:00
mr
465b91fd6e Draft booking flow 2026-03-16 08:52:31 +01:00
mr
12ba346427 bookingstate 2026-03-13 14:32:05 +01:00
mr
2cdf15d722 default to subscription 2026-03-13 14:25:04 +01:00
mr
aeebd8b5b2 pricing strategy defaul is subscription 2026-03-13 14:03:18 +01:00
mr
e355af2bac pricing.PERMANENT 2026-03-13 13:55:40 +01:00
mr
a335c905b3 oclib PB_CLOSE_SEARCH 2026-03-12 15:11:50 +01:00
mr
a30173921f restricted update 2026-03-12 11:56:33 +01:00
mr
e28b79ac0d missing payload 2026-03-12 11:45:24 +01:00
mr
9645e71b54 Acces execution verification for manual verif 2026-03-12 11:40:17 +01:00
mr
9f514a133e add verification flow 2026-03-12 11:37:45 +01:00
50 changed files with 1494 additions and 471 deletions

View File

@@ -27,6 +27,7 @@ type Config struct {
InternalWorkspaceAPI string
InternalPeerAPI string
InternalDatacenterAPI string
InternalSchedulerAPI string
}
func (c Config) GetUrl() string {
@@ -49,7 +50,8 @@ func GetConfig() *Config {
func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string, port int,
pkPath, ppPath,
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI, internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI string) *Config {
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI, internalWorkspaceAPI,
internalPeerAPI, internalDatacenterAPI string, internalSchedulerAPI string) *Config {
GetConfig().MongoUrl = mongoUrl
GetConfig().MongoDatabase = database
GetConfig().NATSUrl = natsUrl
@@ -66,5 +68,6 @@ func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string,
GetConfig().InternalWorkspaceAPI = internalWorkspaceAPI
GetConfig().InternalPeerAPI = internalPeerAPI
GetConfig().InternalDatacenterAPI = internalDatacenterAPI
GetConfig().InternalSchedulerAPI = internalSchedulerAPI
return GetConfig()
}

View File

@@ -0,0 +1,214 @@
# PlantUML Format de commentaire human-readable
Ce document décrit la syntaxe des commentaires attachés aux ressources et aux liens
dans les fichiers PlantUML importés par OpenCloud.
---
## Syntaxe générale
```plantuml
TypeRessource(varName, "Nom affiché") ' clé: valeur, clé.sous_clé: valeur
```
### Règles de parsing
| Règle | Détail |
|---|---|
| Séparateur de paires | `,` |
| Séparateur clé/valeur | premier `:` de la paire (les URLs `http://...` sont gérées) |
| Sous-objets | notation pointée `access.container.image: nginx` |
| Types | auto-inférés : `bool` > `float64` > `string` |
| Fallback | JSON brut si le commentaire commence par `{` (compatibilité ascendante) |
### Comportement à l'import
Chaque ressource reçoit automatiquement une **instance par défaut**, seedée avec les
attributs de la ressource parente. Le commentaire vient ensuite **surcharger** uniquement
les champs explicitement renseignés.
> **Exception :** `WorkflowEvent` n'a pas d'instance (voir section dédiée).
---
## Ressources disponibles
### `Data(var, "nom")` Données
Ressource de données. Les attributs qualifient le modèle de données **et** son instance
(source d'accès).
| Clé | Type | Description |
|---|---|---|
| `type` | string | Type de données (`raster`, `vector`, `tabular`) |
| `quality` | string | Niveau de qualité |
| `open_data` | bool | Données en accès libre |
| `static` | bool | Données statiques (pas de mise à jour) |
| `personal_data` | bool | Contient des données personnelles |
| `anonymized_personal_data` | bool | Données personnelles anonymisées |
| `size` | float64 | Taille en GB |
| `access_protocol` | string | Protocole d'accès (`http`, `s3`, `ftp`) |
| `country` | string | Code pays ISO (`FR`, `DE`) |
| `location.latitude` | float64 | Latitude géographique |
| `location.longitude` | float64 | Longitude géographique |
| `source` | string | URL / endpoint d'accès à la donnée |
```plantuml
Data(d1, "Satellites L2A") ' type: raster, open_data: true, size: 120.5, source: https://catalogue.example.com, country: FR
```
---
### `Processing(var, "nom")` Traitement
Ressource de traitement (algorithme, conteneur, service). Les attributs qualifient
le modèle de traitement **et** sa configuration d'exécution.
| Clé | Type | Description |
|---|---|---|
| `infrastructure` | int | Infrastructure cible : `0`=DOCKER, `1`=KUBERNETES, `2`=SLURM, `3`=HW, `4`=CONDOR |
| `is_service` | bool | Traitement persistant (service long-running) |
| `open_source` | bool | Code source ouvert |
| `license` | string | Licence (`MIT`, `Apache-2.0`, `GPL-3.0`) |
| `maturity` | string | Maturité (`prototype`, `beta`, `production`) |
| `access_protocol` | string | Protocole d'accès |
| `country` | string | Code pays ISO |
| `location.latitude` | float64 | Latitude |
| `location.longitude` | float64 | Longitude |
| `access.container.image` | string | Image du conteneur |
| `access.container.command` | string | Commande de démarrage |
| `access.container.args` | string | Arguments de la commande |
```plantuml
Processing(p1, "NDVI Calc") ' infrastructure: 0, open_source: true, license: MIT, maturity: production, access.container.image: myrepo/ndvi:1.2
```
---
### `Storage(var, "nom")` Stockage
Ressource de stockage. Produit une instance live (`LiveStorage`) à l'import.
| Clé | Type | Description |
|---|---|---|
| `storage_type` | int | Type de stockage (enum) |
| `source` | string | URL / endpoint du stockage |
| `path` | string | Chemin ou bucket dans le stockage |
| `local` | bool | Stockage local |
| `security_level` | string | Niveau de sécurité |
| `size` | float64 | Taille allouée en GB |
| `encryption` | bool | Chiffrement activé |
| `redundancy` | string | Politique de redondance |
| `throughput` | string | Débit cible |
| `access_protocol` | string | Protocole (`s3`, `nfs`, `smb`) |
| `country` | string | Code pays ISO |
| `location.latitude` | float64 | Latitude |
| `location.longitude` | float64 | Longitude |
```plantuml
Storage(s1, "Minio OVH") ' source: http://minio.example.com:9000, path: /bucket/data, access_protocol: s3, encryption: true, size: 500, country: FR
```
---
### `ComputeUnit(var, "nom")` Unité de calcul
Ressource de calcul (datacenter, cluster). Produit une instance live (`LiveDatacenter`)
à l'import.
| Clé | Type | Description |
|---|---|---|
| `architecture` | string | Architecture CPU (`x86_64`, `arm64`) |
| `infrastructure` | int | `0`=DOCKER, `1`=KUBERNETES, `2`=SLURM, `3`=HW, `4`=CONDOR |
| `source` | string | URL de l'API du datacenter |
| `security_level` | string | Niveau de sécurité |
| `annual_co2_emissions` | float64 | Émissions CO annuelles (kg) |
| `access_protocol` | string | Protocole d'accès |
| `country` | string | Code pays ISO |
| `location.latitude` | float64 | Latitude |
| `location.longitude` | float64 | Longitude |
```plantuml
ComputeUnit(c1, "Datacenter Rennes") ' source: https://api.dc-rennes.example.com, infrastructure: 1, country: FR, location.latitude: 48.11, location.longitude: -1.68, security_level: high
```
---
### `WorkflowEvent(var, "nom")` Événement déclencheur de workflow
Crée directement un `NativeTool` de type `WORKFLOW_EVENT` (Kind = 0).
Représente le point de départ d'un workflow.
> **Pas d'instance. Pas de commentaire.**
> Le nom du `NativeTool` est forcé à `WORKFLOW_EVENT` à l'import.
```plantuml
WorkflowEvent(e1, "Start")
```
---
## Liens
Les commentaires sur les liens qualifient la connexion entre deux ressources
(typiquement entre un traitement et un stockage).
### Syntaxe
```plantuml
source --> destination ' clé: valeur
source <-- destination ' clé: valeur
source -- destination ' clé: valeur (non directionnel)
```
### Attributs disponibles
| Clé | Type | Description |
|---|---|---|
| `storage_link_infos.write` | bool | `true` = écriture, `false` = lecture |
| `storage_link_infos.source` | string | Chemin source dans le lien |
| `storage_link_infos.destination` | string | Chemin destination dans le lien |
| `storage_link_infos.filename` | string | Nom du fichier échangé |
```plantuml
p1 --> s1 ' storage_link_infos.write: true, storage_link_infos.filename: output.tif
d1 --> p1
```
---
## Exemple complet
```plantuml
@startuml
!include opencloud.puml
WorkflowEvent(e1, "Start")
Data(d1, "Satellites L2A") ' type: raster, open_data: true, size: 120.5, source: https://catalogue.example.com, country: FR
Processing(p1, "NDVI") ' infrastructure: 0, open_source: true, license: MIT, access.container.image: myrepo/ndvi:1.2
Storage(s1, "Minio résultats") ' source: http://minio.example.com:9000, path: /results, access_protocol: s3, encryption: true, size: 500, country: FR
ComputeUnit(c1, "DC Rennes") ' source: https://api.dc.example.com, infrastructure: 1, country: FR, location.latitude: 48.11, location.longitude: -1.68
e1 --> p1
d1 --> p1
p1 --> s1 ' storage_link_infos.write: true, storage_link_infos.filename: ndvi.tif
s1 --> c1
@enduml
```
---
## Récapitulatif des types de ressources
| Mot-clé PlantUML | Type Go | Instance | Live | Commentaire |
|---|---|---|---|---|
| `Data` | `DataResource` | `DataInstance` | non | oui |
| `Processing` | `ProcessingResource` | `ProcessingInstance` | non | oui |
| `Storage` | `StorageResource` | `StorageResourceInstance` | oui `LiveStorage` | oui |
| `ComputeUnit` | `ComputeResource` | `ComputeResourceInstance` | oui `LiveDatacenter` | oui |
| `WorkflowEvent` | `NativeTool` (Kind=WORKFLOW_EVENT) | aucune | non | non |

View File

@@ -45,24 +45,25 @@ type LibDataEnum int
// init accessible constant to retrieve data from the database
const (
INVALID LibDataEnum = iota
DATA_RESOURCE = tools.DATA_RESOURCE
PROCESSING_RESOURCE = tools.PROCESSING_RESOURCE
STORAGE_RESOURCE = tools.STORAGE_RESOURCE
COMPUTE_RESOURCE = tools.COMPUTE_RESOURCE
WORKFLOW_RESOURCE = tools.WORKFLOW_RESOURCE
WORKFLOW = tools.WORKFLOW
WORKSPACE = tools.WORKSPACE
WORKFLOW_EXECUTION = tools.WORKFLOW_EXECUTION
PEER = tools.PEER
COLLABORATIVE_AREA = tools.COLLABORATIVE_AREA
RULE = tools.RULE
BOOKING = tools.BOOKING
ORDER = tools.ORDER
LIVE_DATACENTER = tools.LIVE_DATACENTER
LIVE_STORAGE = tools.LIVE_STORAGE
PURCHASE_RESOURCE = tools.PURCHASE_RESOURCE
NATIVE_TOOL = tools.NATIVE_TOOL
INVALID LibDataEnum = iota
DATA_RESOURCE = tools.DATA_RESOURCE
PROCESSING_RESOURCE = tools.PROCESSING_RESOURCE
STORAGE_RESOURCE = tools.STORAGE_RESOURCE
COMPUTE_RESOURCE = tools.COMPUTE_RESOURCE
WORKFLOW_RESOURCE = tools.WORKFLOW_RESOURCE
WORKFLOW = tools.WORKFLOW
WORKSPACE = tools.WORKSPACE
WORKFLOW_EXECUTION = tools.WORKFLOW_EXECUTION
PEER = tools.PEER
COLLABORATIVE_AREA = tools.COLLABORATIVE_AREA
RULE = tools.RULE
BOOKING = tools.BOOKING
ORDER = tools.ORDER
LIVE_DATACENTER = tools.LIVE_DATACENTER
LIVE_STORAGE = tools.LIVE_STORAGE
PURCHASE_RESOURCE = tools.PURCHASE_RESOURCE
NATIVE_TOOL = tools.NATIVE_TOOL
EXECUTION_VERIFICATION = tools.EXECUTION_VERIFICATION
)
func GetMySelf() (*peer.Peer, error) {
@@ -164,6 +165,7 @@ func InitDaemon(appName string) {
o.GetStringDefault("INTERNAL_WORKSPACE_API", "oc-workspace"),
o.GetStringDefault("INTERNAL_PEER_API", "oc-peer"),
o.GetStringDefault("INTERNAL_DATACENTER_API", "oc-datacenter"),
o.GetStringDefault("INTERNAL_SCHEDULER_API", "oc-scheduler"),
)
// Beego init
beego.BConfig.AppName = appName
@@ -253,8 +255,10 @@ func GetLogger() zerolog.Logger {
*/
func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string, logLevel string,
port int, pppath string, pkpath string,
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI, internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI string) *config.Config {
cfg := config.SetConfig(mongoUrl, database, natsUrl, lokiUrl, logLevel, port, pkpath, pppath, internalCatalogAPI, internalSharedAPI, internalWorkflowAPI, internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI)
internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI string, internalSchedulerAPI string) *config.Config {
cfg := config.SetConfig(mongoUrl, database, natsUrl, lokiUrl, logLevel, port, pkpath, pppath, internalCatalogAPI, internalSharedAPI, internalWorkflowAPI,
internalWorkspaceAPI, internalPeerAPI, internalDatacenterAPI, internalSchedulerAPI)
defer func() {
if r := recover(); r != nil {
tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in Init : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack())))

View File

@@ -59,13 +59,12 @@ func (w *LokiWriter) Write(p []byte) (n int, err error) {
// A bit unsafe since we don't know what could be stored in the event
// but we can't access this object once passed to the multilevel writter
for k,v := range(event){
if k != "level" && k != "time" && k != "message"{
labels[k] = v.(string)
for k, v := range event {
if k != "level" && k != "time" && k != "message" {
labels[k] = fmt.Sprintf("%v", v)
}
}
// Format the timestamp in nanoseconds
timestamp := fmt.Sprintf("%d000000", time.Now().UnixNano()/int64(time.Millisecond))
@@ -87,7 +86,7 @@ func (w *LokiWriter) Write(p []byte) (n int, err error) {
//fmt.Printf("Sending payload to Loki: %s\n", string(payloadBytes))
req, err := http.NewRequest("POST", w.url + "/loki/api/v1/push", bytes.NewReader(payloadBytes))
req, err := http.NewRequest("POST", w.url+"/loki/api/v1/push", bytes.NewReader(payloadBytes))
if err != nil {
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}

View File

@@ -2,12 +2,12 @@ package bill
import (
"encoding/json"
"fmt"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
@@ -49,9 +49,10 @@ func DraftFirstBill(order *order.Order, request *tools.APIRequest) (*Bill, error
peers[p.DestPeerID] = []*PeerItemOrder{}
}
peers[p.DestPeerID] = append(peers[p.DestPeerID], &PeerItemOrder{
Purchase: p,
Item: p.PricedItem,
Quantity: 1,
ResourceType: p.ResourceType,
Purchase: p,
Item: p.PricedItem,
Quantity: 1,
})
}
for _, b := range order.Bookings {
@@ -70,7 +71,9 @@ func DraftFirstBill(order *order.Order, request *tools.APIRequest) (*Bill, error
peers[b.DestPeerID] = []*PeerItemOrder{}
}
peers[b.DestPeerID] = append(peers[b.DestPeerID], &PeerItemOrder{
Item: b.PricedItem,
ResourceType: b.ResourceType,
Quantity: 1,
Item: b.PricedItem,
})
}
peerOrders := map[string]*PeerOrder{}
@@ -136,6 +139,22 @@ type PeerOrder struct {
Total float64 `json:"total,omitempty" bson:"total,omitempty"`
}
func PricedByType(dt tools.DataType) pricing.PricedItemITF {
switch dt {
case tools.PROCESSING_RESOURCE:
return &resources.PricedProcessingResource{}
case tools.STORAGE_RESOURCE:
return &resources.PricedStorageResource{}
case tools.DATA_RESOURCE:
return &resources.PricedDataResource{}
case tools.COMPUTE_RESOURCE:
return &resources.PricedComputeResource{}
case tools.WORKFLOW_RESOURCE:
return &resources.PricedResource[*pricing.ExploitPricingProfile[pricing.TimePricingStrategy]]{}
}
return nil
}
func (d *PeerOrder) Pay(request *tools.APIRequest, response chan *PeerOrder, wg *sync.WaitGroup) {
d.Status = enum.PENDING
@@ -145,7 +164,7 @@ func (d *PeerOrder) Pay(request *tools.APIRequest, response chan *PeerOrder, wg
d.Status = enum.PAID // TO REMOVE LATER IT'S A MOCK
if d.Status == enum.PAID {
for _, b := range d.Items {
var priced *resources.PricedResource
priced := PricedByType(b.ResourceType)
bb, _ := json.Marshal(b.Item)
json.Unmarshal(bb, priced)
if !priced.IsPurchasable() {
@@ -179,9 +198,10 @@ func (d *PeerOrder) SumUpBill(request *tools.APIRequest) error {
}
type PeerItemOrder struct {
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
Purchase *purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"`
Item map[string]interface{} `json:"item,omitempty" bson:"item,omitempty"`
ResourceType tools.DataType `json:"datatype,omitempty" bson:"datatype,omitempty"`
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
Purchase *purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"`
Item map[string]interface{} `json:"item,omitempty" bson:"item,omitempty"`
}
func (d *PeerItemOrder) GetPriceHT(request *tools.APIRequest) (float64, error) {
@@ -190,11 +210,10 @@ func (d *PeerItemOrder) GetPriceHT(request *tools.APIRequest) (float64, error) {
return 0, nil
}
///////////
var priced *resources.PricedResource
priced := PricedByType(d.ResourceType)
b, _ := json.Marshal(d.Item)
err := json.Unmarshal(b, priced)
if err != nil {
fmt.Println(err)
return 0, err
}
accessor := purchase_resource.NewAccessor(request)

View File

@@ -3,12 +3,10 @@ package booking
import (
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
)
/*
@@ -25,7 +23,7 @@ type Booking struct {
DestPeerID string `json:"dest_peer_id,omitempty" bson:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"`
State enum.BookingStatus `json:"state,omitempty" bson:"state,omitempty" validate:"required"` // State is the state of the booking
State enum.BookingStatus `json:"state" bson:"state"` // State is the state of the booking
ExpectedStartDate time.Time `json:"expected_start_date,omitempty" bson:"expected_start_date,omitempty" validate:"required"` // ExpectedStartDate is the expected start date of the booking
ExpectedEndDate *time.Time `json:"expected_end_date,omitempty" bson:"expected_end_date,omitempty" validate:"required"` // ExpectedEndDate is the expected end date of the booking
@@ -36,6 +34,9 @@ type Booking struct {
ResourceID string `json:"resource_id,omitempty" bson:"resource_id,omitempty" validate:"required"` // could be a Compute or a Storage
InstanceID string `json:"instance_id,omitempty" bson:"instance_id,omitempty" validate:"required"` // could be a Compute or a Storage
// Authorization: identifies who created this draft and the Check session it belongs to.
// Used to verify UPDATE and DELETE orders from remote schedulers.
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
}
func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResume {
@@ -65,40 +66,15 @@ func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResu
return m
}
// CheckBooking checks if a booking is possible on a specific compute resource
func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelAllowed int) (bool, error) {
// check if
if end == nil {
// if no end... then Book like a savage
e := start.Add(time.Hour)
end = &e
}
accessor := NewAccessor(nil)
res, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date
"resource_id": {{Operator: dbs.EQUAL.String(), Value: id}},
"state": {{Operator: dbs.EQUAL.String(), Value: enum.DRAFT.EnumIndex()}},
"expected_start_date": {
{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*end)},
{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(start)},
},
},
}, "", wfa.IsDraft)
if code != 200 {
return false, err
}
return len(res) <= parrallelAllowed, nil
}
func (d *Booking) GetDelayForLaunch() time.Duration {
return d.RealStartDate.Sub(d.ExpectedStartDate)
}
func (d *Booking) GetDelayForFinishing() time.Duration {
if d.ExpectedEndDate == nil {
if d.ExpectedEndDate == nil || d.RealEndDate == nil {
return time.Duration(0)
}
return d.RealEndDate.Sub(d.ExpectedStartDate)
return d.RealEndDate.Sub(*d.ExpectedEndDate)
}
func (d *Booking) GetUsualDuration() time.Duration {
@@ -125,18 +101,25 @@ func (d *Booking) VerifyAuth(callName string, request *tools.APIRequest) bool {
}
func (r *Booking) StoreDraftDefault() {
r.IsDraft = false
r.IsDraft = true
r.State = enum.DRAFT
}
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if !r.IsDraft && r.State != set.(*Booking).State || r.RealStartDate != set.(*Booking).RealStartDate || r.RealEndDate != set.(*Booking).RealEndDate {
return true, &Booking{
State: set.(*Booking).State,
RealStartDate: set.(*Booking).RealStartDate,
RealEndDate: set.(*Booking).RealEndDate,
} // only state can be updated
incoming := set.(*Booking)
if !r.IsDraft && r.State != incoming.State || r.RealStartDate != incoming.RealStartDate || r.RealEndDate != incoming.RealEndDate {
patch := &Booking{
State: incoming.State,
RealStartDate: incoming.RealStartDate,
RealEndDate: incoming.RealEndDate,
}
// Auto-set RealStartDate when transitioning to STARTED and not already set
if r.State != enum.STARTED && incoming.State == enum.STARTED && patch.RealStartDate == nil {
now := time.Now()
patch.RealStartDate = &now
}
return true, patch
}
// TODO : HERE WE CAN HANDLE THE CASE WHERE THE BOOKING IS DELAYED OR EXCEEDING OR ending sooner
return r.IsDraft, set
}

View File

@@ -2,9 +2,12 @@ package planner
import (
"encoding/json"
"sort"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -37,6 +40,12 @@ type PlannerSlot struct {
Usage map[string]float64 `json:"usage,omitempty"` // dimension -> % of max (0-100)
}
// PlannerITF is the interface used by Planify to check resource availability.
// *Planner satisfies this interface.
type PlannerITF interface {
NextAvailableStart(resourceID, instanceID string, start time.Time, d time.Duration) time.Time
}
// Planner is a volatile (non-persisted) object that organises bookings by resource.
// Only ComputeResource and StorageResource bookings appear in the schedule.
type Planner struct {
@@ -58,10 +67,23 @@ func GenerateShallow(request *tools.APIRequest) (*Planner, error) {
func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
accessor := booking.NewAccessor(request)
bookings, code, err := accessor.Search(nil, "*", false)
// Include both confirmed (IsDraft=false) and draft (IsDraft=true) bookings
// so the planner reflects the full picture: first-come first-served on all
// pending reservations regardless of confirmation state.
confirmed, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{Operator: dbs.GTE.String(), Value: time.Now().UTC()}},
},
}, "*", false)
if code != 200 || err != nil {
return nil, err
}
drafts, _, _ := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{Operator: dbs.GTE.String(), Value: time.Now().UTC()}},
},
}, "*", true)
bookings := append(confirmed, drafts...)
p := &Planner{
GeneratedAt: time.Now(),
@@ -72,6 +94,12 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
for _, b := range bookings {
bk := b.(*booking.Booking)
// Skip terminal bookings — they no longer occupy capacity.
switch bk.State {
case enum.SUCCESS, enum.FAILURE, enum.FORGOTTEN, enum.CANCELLED:
continue
}
// Only compute and storage resources are eligible
if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE {
continue
@@ -79,11 +107,14 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
end := bk.ExpectedEndDate
if end == nil {
e := bk.ExpectedStartDate.Add(time.Hour)
e := bk.ExpectedStartDate.UTC().Add(5 * time.Minute)
end = &e
}
instanceID, usage, cap := extractSlotData(bk, request)
if instanceID == "" {
instanceID = bk.InstanceID
}
if cap != nil && instanceID != "" {
if p.Capacities[bk.ResourceID] == nil {
@@ -121,7 +152,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
// If no capacity is known for this instance (never booked), it is fully available.
func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool {
if end == nil {
e := start.Add(time.Hour)
e := start.Add(5 * time.Minute)
end = &e
}
@@ -132,7 +163,6 @@ func (p *Planner) Check(resourceID string, instanceID string, req *ResourceReque
if !ok {
return true
}
for _, slot := range slots {
// Only consider slots on the same instance
if slot.InstanceID != instanceID {
@@ -142,9 +172,13 @@ func (p *Planner) Check(resourceID string, instanceID string, req *ResourceReque
if !slot.Start.Before(*end) || !slot.End.After(start) {
continue
}
// If capacity is unknown (reqPct empty), any overlap blocks the slot.
if len(reqPct) == 0 {
return false
}
// Combined usage must not exceed 100 % for any requested dimension
for dim, needed := range reqPct {
if slot.Usage[dim]+needed > 100.0 {
if slot.Usage[dim]+needed >= 100.0 {
return false
}
}
@@ -250,7 +284,6 @@ func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID
if err != nil {
return
}
switch bk.ResourceType {
case tools.COMPUTE_RESOURCE:
instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request)
@@ -432,3 +465,34 @@ func totalRAM(instance *resources.ComputeResourceInstance) float64 {
}
return total
}
// NextAvailableStart returns the earliest time >= start when resourceID/instanceID has a
// free window of duration d. Slots are scanned in order so a single linear pass suffices.
// If the planner has no slots for this resource/instance, start is returned unchanged.
func (p *Planner) NextAvailableStart(resourceID, instanceID string, start time.Time, d time.Duration) time.Time {
slots := p.Schedule[resourceID]
if len(slots) == 0 {
return start
}
// Collect and sort slots for this instance by start time.
relevant := make([]*PlannerSlot, 0, len(slots))
for _, s := range slots {
if s.InstanceID == instanceID {
relevant = append(relevant, s)
}
}
sort.Slice(relevant, func(i, j int) bool { return relevant[i].Start.Before(relevant[j].Start) })
end := start.Add(d)
for _, slot := range relevant {
if !slot.Start.Before(end) {
break // all remaining slots start after our window — done
}
if slot.End.After(start) {
// conflict: push start to after this slot
start = slot.End
end = start.Add(d)
}
}
return start
}

View File

@@ -13,8 +13,8 @@ import (
)
func TestBooking_GetDurations(t *testing.T) {
start := time.Now().Add(-2 * time.Hour)
end := start.Add(1 * time.Hour)
start := time.Now().Add(-10 * time.Minute)
end := start.Add(5 * time.Minute)
realStart := start.Add(30 * time.Minute)
realEnd := realStart.Add(90 * time.Minute)

View File

@@ -92,7 +92,6 @@ func filterEnrich[T utils.ShallowDBObject](arr []string, isDrafted bool, a utils
"abstractobject.id": {{Operator: dbs.IN.String(), Value: arr}},
},
}, "", isDrafted)
fmt.Println(res, arr, isDrafted, a)
if code == 200 {
for _, r := range res {
new = append(new, r.(T))

View File

@@ -7,36 +7,36 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
near := float64(10000000000) // set a high value
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 {
near := float64(-1) // unset sentinel
for _, items := range planned { // loop through the planned items
for _, priced := range items { // loop through the priced items
if priced.GetLocationStart() == nil { // if the start is nil,
continue // skip the iteration
}
newS := priced.GetLocationStart() // get the start
if newS.Sub(start).Seconds() < near { // if the difference between the start and the new start is less than the nearest start
near = newS.Sub(start).Seconds()
newS := priced.GetLocationStart() // get the start
diff := newS.Sub(start).Seconds() // get the difference
if near < 0 || diff < near { // if the difference is less than the nearest start
near = diff
}
}
}
if near < 0 {
return 0 // no items found, start at the given start time
}
return near
}
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
if end == nil {
return -1
}
// GetPlannerLongestTime returns the sum of all processing durations (conservative estimate).
// Returns -1 if any processing is a service (open-ended).
func GetPlannerLongestTime(planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 {
longestTime := float64(0)
for _, priced := range planned[tools.PROCESSING_RESOURCE] {
if priced.GetLocationEnd() == nil {
continue
d := priced.GetExplicitDurationInS()
if d < 0 {
return -1 // service present: booking is open-ended
}
newS := priced.GetLocationEnd()
if end == nil && longestTime < newS.Sub(*end).Seconds() {
longestTime = newS.Sub(*end).Seconds()
}
// get the nearest start from start var
longestTime += d
}
return longestTime
}

View File

@@ -59,8 +59,8 @@ func GetDefaultPricingProfile() PricingProfileITF {
Pricing: PricingStrategy[TimePricingStrategy]{
Price: 0,
Currency: "EUR",
BuyingStrategy: PERMANENT,
TimePricingStrategy: ONCE,
BuyingStrategy: SUBSCRIPTION,
TimePricingStrategy: PER_SECOND,
},
}
}

View File

@@ -41,14 +41,14 @@ type BuyingStrategy int
// should except... on
const (
PERMANENT BuyingStrategy = iota // is a permanent buying ( predictible )
SUBSCRIPTION BuyingStrategy = iota // is a permanent buying ( predictible )
UNDEFINED_SUBSCRIPTION // a endless subscription ( unpredictible )
SUBSCRIPTION // a defined subscription ( predictible )
PERMANENT // a defined subscription ( predictible )
// PAY_PER_USE // per request. ( unpredictible )
)
func (t BuyingStrategy) String() string {
return [...]string{"PERMANENT", "UNDEFINED_SUBSCRIPTION", "SUBSCRIPTION"}[t]
return [...]string{"SUBSCRIPTION", "UNDEFINED_SUBSCRIPTION", "PERMANENT"}[t]
}
func (t BuyingStrategy) IsBillingStrategyAllowed(bs BillingStrategy) (BillingStrategy, bool) {
@@ -65,7 +65,7 @@ func (t BuyingStrategy) IsBillingStrategyAllowed(bs BillingStrategy) (BillingStr
}
func BuyingStrategyList() []BuyingStrategy {
return []BuyingStrategy{PERMANENT, UNDEFINED_SUBSCRIPTION, SUBSCRIPTION}
return []BuyingStrategy{SUBSCRIPTION, UNDEFINED_SUBSCRIPTION, PERMANENT}
}
type Strategy interface {
@@ -112,7 +112,7 @@ func getAverageTimeInSecond(averageTimeInSecond float64, start time.Time, end *t
fromAverageDuration := after.Sub(now).Seconds()
var tEnd time.Time
if end == nil {
tEnd = start.Add(1 * time.Hour)
tEnd = start.Add(5 * time.Minute)
} else {
tEnd = *end
}
@@ -160,7 +160,8 @@ type PricingStrategy[T Strategy] struct {
}
func (p PricingStrategy[T]) GetPriceHT(amountOfData float64, bookingTimeDuration float64, start time.Time, end *time.Time, variations []*PricingVariation) (float64, error) {
if p.BuyingStrategy == SUBSCRIPTION {
switch p.BuyingStrategy {
case SUBSCRIPTION:
price, err := BookingEstimation(p.GetTimePricingStrategy(), p.Price*float64(amountOfData), bookingTimeDuration, start, end)
if err != nil {
return 0, err
@@ -174,7 +175,7 @@ func (p PricingStrategy[T]) GetPriceHT(amountOfData float64, bookingTimeDuration
return p.Price, nil
} else if p.BuyingStrategy == PERMANENT {
case PERMANENT:
if variations != nil {
price := p.Price
for _, v := range variations {

View File

@@ -63,7 +63,7 @@ func Test_getAverageTimeInSecond_WithoutEnd(t *testing.T) {
func TestBookingEstimation(t *testing.T) {
start := time.Now()
end := start.Add(2 * time.Hour)
end := start.Add(10 * time.Minute)
strategies := map[pricing.TimePricingStrategy]float64{
pricing.ONCE: 50,
pricing.PER_HOUR: 10,
@@ -102,7 +102,7 @@ func TestPricingStrategy_Getters(t *testing.T) {
func TestPricingStrategy_GetPriceHT(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
// SUBSCRIPTION case
ps := pricing.PricingStrategy[DummyStrategy]{

View File

@@ -0,0 +1,31 @@
package execution_verification
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* ExecutionVerification is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
* workflows generate their own executions
*/
type ExecutionVerification struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
Payload string `json:"payload" bson:"payload,omitempty"`
IsVerified bool `json:"is_verified" bson:"is_verified,omitempty"`
Validate bool `json:"validate" bson:"validate,omitempty"`
}
func (r *ExecutionVerification) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY
}
func (d *ExecutionVerification) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}
func (d *ExecutionVerification) VerifyAuth(callName string, request *tools.APIRequest) bool {
return true
}

View File

@@ -0,0 +1,38 @@
package execution_verification
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type ExecutionVerificationMongoAccessor struct {
utils.AbstractAccessor[*ExecutionVerification]
shallow bool
}
func NewAccessor(request *tools.APIRequest) *ExecutionVerificationMongoAccessor {
return &ExecutionVerificationMongoAccessor{
shallow: false,
AbstractAccessor: utils.AbstractAccessor[*ExecutionVerification]{
Logger: logs.CreateLogger(tools.WORKFLOW_EXECUTION.String()), // Create a logger with the data type
Request: request,
Type: tools.WORKFLOW_EXECUTION,
New: func() *ExecutionVerification { return &ExecutionVerification{} },
NotImplemented: []string{"DeleteOne", "StoreOne", "CopyOne"},
},
}
}
func (wfa *ExecutionVerificationMongoAccessor) StoreOne(set utils.DBObject) (utils.DBObject, int, error) {
set.(*ExecutionVerification).IsVerified = false
return utils.GenericStoreOne(set, wfa)
}
func (wfa *ExecutionVerificationMongoAccessor) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
set = map[string]interface{}{
"is_verified": true,
"validate": set["validate"],
}
return utils.GenericUpdateOne(set, id, wfa)
}

View File

@@ -63,7 +63,7 @@ func (r *AbstractLive) GetResourceType() tools.DataType {
}
func (r *AbstractLive) StoreDraftDefault() {
r.IsDraft = true
r.IsDraft = false
}
func (r *AbstractLive) CanDelete() bool {

View File

@@ -9,13 +9,13 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
type computeUnitsMongoAccessor[T LiveInterface] struct {
type liveMongoAccessor[T LiveInterface] struct {
utils.AbstractAccessor[LiveInterface] // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
// New creates a new instance of the computeUnitsMongoAccessor
func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *computeUnitsMongoAccessor[T] {
return &computeUnitsMongoAccessor[T]{
func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *liveMongoAccessor[T] {
return &liveMongoAccessor[T]{
AbstractAccessor: utils.AbstractAccessor[LiveInterface]{
Logger: logs.CreateLogger(t.String()), // Create a logger with the data type
Request: request,
@@ -36,15 +36,15 @@ func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *
/*
* Nothing special here, just the basic CRUD operations
*/
func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
func (a *liveMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
// is a publisher... that become a resources.
if data.IsDrafted() {
return nil, 422, errors.New("can't publish a drafted compute units")
}
live := data.(T)
if live.GetMonitorPath() == "" || live.GetID() != "" {
/*if live.GetMonitorPath() == "" || live.GetID() != "" {
return nil, 422, errors.New("publishing is only allowed is it can be monitored and be accessible")
}
}*/
if res, code, err := a.LoadOne(live.GetID()); err != nil {
return nil, code, err
} else {
@@ -57,7 +57,6 @@ func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObj
if len(live.GetResourcesID()) > 0 {
for _, r := range live.GetResourcesID() {
// TODO dependent of a existing resource
res, code, err := resAccess.LoadOne(r)
if err == nil {
return nil, code, err
@@ -78,7 +77,7 @@ func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObj
b, _ := json.Marshal(live)
json.Unmarshal(b, &r)
live.SetResourceInstance(r, instance)
res, code, err := resAccess.StoreOne(r)
res, code, err := utils.GenericStoreOne(r, resAccess)
if err != nil {
return nil, code, err
}

View File

@@ -3,6 +3,7 @@ package models
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
@@ -24,26 +25,27 @@ This package contains the models used in the application
It's used to create the models dynamically
*/
var ModelsCatalog = map[string]func() utils.DBObject{
tools.WORKFLOW_RESOURCE.String(): func() utils.DBObject { return &resource.WorkflowResource{} },
tools.DATA_RESOURCE.String(): func() utils.DBObject { return &resource.DataResource{} },
tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &resource.ComputeResource{} },
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.NATIVE_TOOL.String(): func() utils.DBObject { return &resource.NativeTool{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },
tools.RULE.String(): func() utils.DBObject { return &rule.Rule{} },
tools.BOOKING.String(): func() utils.DBObject { return &booking.Booking{} },
tools.WORKFLOW_HISTORY.String(): func() utils.DBObject { return &w2.WorkflowHistory{} },
tools.WORKSPACE_HISTORY.String(): func() utils.DBObject { return &w3.WorkspaceHistory{} },
tools.ORDER.String(): func() utils.DBObject { return &order.Order{} },
tools.PURCHASE_RESOURCE.String(): func() utils.DBObject { return &purchase_resource.PurchaseResource{} },
tools.LIVE_DATACENTER.String(): func() utils.DBObject { return &live.LiveDatacenter{} },
tools.LIVE_STORAGE.String(): func() utils.DBObject { return &live.LiveStorage{} },
tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} },
tools.WORKFLOW_RESOURCE.String(): func() utils.DBObject { return &resource.WorkflowResource{} },
tools.DATA_RESOURCE.String(): func() utils.DBObject { return &resource.DataResource{} },
tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &resource.ComputeResource{} },
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.NATIVE_TOOL.String(): func() utils.DBObject { return &resource.NativeTool{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },
tools.RULE.String(): func() utils.DBObject { return &rule.Rule{} },
tools.BOOKING.String(): func() utils.DBObject { return &booking.Booking{} },
tools.WORKFLOW_HISTORY.String(): func() utils.DBObject { return &w2.WorkflowHistory{} },
tools.WORKSPACE_HISTORY.String(): func() utils.DBObject { return &w3.WorkspaceHistory{} },
tools.ORDER.String(): func() utils.DBObject { return &order.Order{} },
tools.PURCHASE_RESOURCE.String(): func() utils.DBObject { return &purchase_resource.PurchaseResource{} },
tools.LIVE_DATACENTER.String(): func() utils.DBObject { return &live.LiveDatacenter{} },
tools.LIVE_STORAGE.String(): func() utils.DBObject { return &live.LiveStorage{} },
tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
}
// Model returns the model object based on the model type

View File

@@ -17,10 +17,10 @@ import (
type Order struct {
utils.AbstractObject
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
Billing map[pricing.BillingStrategy][]*booking.Booking `json:"billing" bson:"billing"`
}

View File

@@ -41,7 +41,6 @@ func CheckPeerStatus(peerID string, appName string) (*Peer, bool) {
return nil, false
}
url := urlFormat(res.(*Peer).APIUrl, tools.PEER) + "/status" // Format the URL
fmt.Println(url)
state, services := api.CheckRemotePeer(url)
res.(*Peer).ServicesState = services // Update the services states of the peer
access.UpdateOne(res.Serialize(res), peerID) // Update the peer in the db

View File

@@ -2,6 +2,7 @@ package resources
import (
"errors"
"fmt"
"strings"
"time"
@@ -35,11 +36,11 @@ func (abs *ComputeResource) ConvertToPricedResource(t tools.DataType, selectedIn
if t != tools.COMPUTE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Compute")
}
p, err := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, request)
p, err := ConvertToPricedResource[*ComputeResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource)
priced := p.(*PricedResource[*ComputeResourcePricingProfile])
return &PricedComputeResource{
PricedResource: *priced,
}, nil
@@ -95,7 +96,8 @@ type ComputeResourcePricingProfile struct {
}
func (p *ComputeResourcePricingProfile) IsPurchasable() bool {
return p.Pricing.BuyingStrategy != pricing.UNDEFINED_SUBSCRIPTION
fmt.Println("Buying", p.Pricing.BuyingStrategy)
return p.Pricing.BuyingStrategy == pricing.PERMANENT
}
func (p *ComputeResourcePricingProfile) GetPurchase() pricing.BuyingStrategy {
@@ -120,7 +122,10 @@ func (p *ComputeResourcePricingProfile) GetPriceHT(amountOfData float64, explici
return 0, errors.New("params must be set")
}
pp := float64(0)
model := params[1]
model := ""
if len(params) > 1 {
model = params[1]
}
if strings.Contains(params[0], "cpus") && len(params) > 1 {
if _, ok := p.CPUsPrices[model]; ok {
p.Pricing.Price = p.CPUsPrices[model]
@@ -156,18 +161,35 @@ func (p *ComputeResourcePricingProfile) GetPriceHT(amountOfData float64, explici
}
type PricedComputeResource struct {
PricedResource
PricedResource[*ComputeResourcePricingProfile]
CPUsLocated map[string]float64 `json:"cpus_in_use" bson:"cpus_in_use"` // CPUsInUse is the list of CPUs in use
GPUsLocated map[string]float64 `json:"gpus_in_use" bson:"gpus_in_use"` // GPUsInUse is the list of GPUs in use
RAMLocated float64 `json:"ram_in_use" bson:"ram_in_use"` // RAMInUse is the RAM in use
}
func (r *PricedComputeResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &ComputeResourcePricingProfile{}
}
}
func (r *PricedComputeResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedComputeResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedComputeResource) GetType() tools.DataType {
return tools.COMPUTE_RESOURCE
}
func (r *PricedComputeResource) GetPriceHT() (float64, error) {
r.ensurePricing()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
}
@@ -176,12 +198,9 @@ func (r *PricedComputeResource) GetPriceHT() (float64, error) {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Compute" + r.ResourceID)
}
pricing := r.SelectedPricing
price := float64(0)
for _, l := range []map[string]float64{r.CPUsLocated, r.GPUsLocated} {

View File

@@ -41,11 +41,11 @@ func (abs *DataResource) ConvertToPricedResource(t tools.DataType, selectedInsta
if t != tools.DATA_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data")
}
p, err := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, request)
p, err := ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource)
priced := p.(*PricedResource[*DataResourcePricingProfile])
return &PricedDataResource{
PricedResource: *priced,
}, nil
@@ -151,7 +151,7 @@ func (p *DataResourcePricingProfile) GetOverrideStrategyValue() int {
}
func (p *DataResourcePricingProfile) IsPurchasable() bool {
return p.Pricing.BuyingStrategy != pricing.UNDEFINED_SUBSCRIPTION
return p.Pricing.BuyingStrategy == pricing.PERMANENT
}
func (p *DataResourcePricingProfile) IsBooked() bool {
@@ -160,15 +160,32 @@ func (p *DataResourcePricingProfile) IsBooked() bool {
}
type PricedDataResource struct {
PricedResource
PricedResource[*DataResourcePricingProfile]
UsageStorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"`
}
func (r *PricedDataResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &DataResourcePricingProfile{}
}
}
func (r *PricedDataResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedDataResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedDataResource) GetType() tools.DataType {
return tools.DATA_RESOURCE
}
func (r *PricedDataResource) GetPriceHT() (float64, error) {
r.ensurePricing()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
}
@@ -177,12 +194,9 @@ func (r *PricedDataResource) GetPriceHT() (float64, error) {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Data" + r.ResourceID)
}
pricing := r.SelectedPricing
var err error
amountOfData := float64(1)

View File

@@ -8,6 +8,10 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
type PricedResourceITF interface {
pricing.PricedItemITF
}
type ResourceInterface interface {
utils.DBObject
FilterPeer(peerID string) *dbs.Filters
@@ -15,7 +19,7 @@ type ResourceInterface interface {
ConvertToPricedResource(t tools.DataType, a *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, b *int, request *tools.APIRequest) (pricing.PricedItemITF, error)
GetType() string
ClearEnv() utils.DBObject
SetAllowedInstances(request *tools.APIRequest, instance_id ...string)
SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF
AddInstances(instance ResourceInstanceITF)
GetSelectedInstance(index *int) ResourceInstanceITF
}
@@ -31,6 +35,8 @@ type ResourceInstanceITF interface {
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups()
GetAverageDurationS() float64
UpdateAverageDuration(actualS float64)
}
type ResourcePartnerITF interface {

View File

@@ -37,12 +37,13 @@ func (d *NativeTool) ClearEnv() utils.DBObject {
return d
}
func (w *NativeTool) SetAllowedInstances(request *tools.APIRequest, ids ...string) {
func (w *NativeTool) SetAllowedInstances(request *tools.APIRequest, ids ...string) []ResourceInstanceITF {
/* EMPTY */
return []ResourceInstanceITF{}
}
func (w *NativeTool) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
return &PricedResource{
return &PricedResource[*pricing.ExploitPricingProfile[pricing.TimePricingStrategy]]{
Name: w.Name,
Logo: w.Logo,
ResourceID: w.UUID,

View File

@@ -16,11 +16,11 @@ type BookingConfiguration struct {
Mode booking.BookingMode `json:"mode,omitempty" bson:"mode,omitempty"`
}
type PricedResource struct {
type PricedResource[T pricing.PricingProfileITF] struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Logo string `json:"logo,omitempty" bson:"logo,omitempty"`
InstancesRefs map[string]string `json:"instances_refs,omitempty" bson:"instances_refs,omitempty"`
SelectedPricing pricing.PricingProfileITF `json:"selected_pricing,omitempty" bson:"selected_pricing,omitempty"`
SelectedPricing T `json:"selected_pricing,omitempty" bson:"selected_pricing,omitempty"`
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
BookingConfiguration *BookingConfiguration `json:"booking_configuration,omitempty" bson:"booking_configuration,omitempty"`
Variations []*pricing.PricingVariation `json:"pricing_variations" bson:"pricing_variations"`
@@ -31,101 +31,107 @@ type PricedResource struct {
ResourceType tools.DataType `json:"resource_type,omitempty" bson:"resource_type,omitempty"`
}
func (abs *PricedResource) GetQuantity() int {
func (abs *PricedResource[T]) GetQuantity() int {
return abs.Quantity
}
func (abs *PricedResource) AddQuantity(amount int) {
func (abs *PricedResource[T]) AddQuantity(amount int) {
abs.Quantity += amount
}
func (abs *PricedResource) SelectPricing() pricing.PricingProfileITF {
func (abs *PricedResource[T]) SelectPricing() pricing.PricingProfileITF {
return abs.SelectedPricing
}
func (abs *PricedResource) GetID() string {
func (abs *PricedResource[T]) GetID() string {
return abs.ResourceID
}
func (abs *PricedResource) GetInstanceID() string {
func (abs *PricedResource[T]) GetInstanceID() string {
return abs.InstanceID
}
func (abs *PricedResource) GetType() tools.DataType {
func (abs *PricedResource[T]) GetType() tools.DataType {
return abs.ResourceType
}
func (abs *PricedResource) GetCreatorID() string {
func (abs *PricedResource[T]) GetCreatorID() string {
return abs.CreatorID
}
func (abs *PricedResource) IsPurchasable() bool {
if abs.SelectedPricing == nil {
// IsPurchasable and IsBooked fall back to false when SelectedPricing is a nil interface.
// Concrete types (PricedComputeResource, etc.) override these and guarantee non-nil pricing.
func (abs *PricedResource[T]) IsPurchasable() bool {
if any(abs.SelectedPricing) == nil {
return false
}
return (abs.SelectedPricing).IsPurchasable()
return abs.SelectedPricing.IsPurchasable()
}
func (abs *PricedResource) IsBooked() bool {
if abs.SelectedPricing == nil {
func (abs *PricedResource[T]) IsBooked() bool {
if any(abs.SelectedPricing) == nil {
return false
}
return (abs.SelectedPricing).IsBooked()
return abs.SelectedPricing.IsBooked()
}
func (abs *PricedResource) GetLocationEnd() *time.Time {
func (abs *PricedResource[T]) GetLocationEnd() *time.Time {
if abs.BookingConfiguration == nil {
return nil
}
return abs.BookingConfiguration.UsageEnd
}
func (abs *PricedResource) GetLocationStart() *time.Time {
func (abs *PricedResource[T]) GetLocationStart() *time.Time {
if abs.BookingConfiguration == nil {
return nil
}
return abs.BookingConfiguration.UsageStart
}
func (abs *PricedResource) SetLocationStart(start time.Time) {
func (abs *PricedResource[T]) SetLocationStart(start time.Time) {
if abs.BookingConfiguration == nil {
abs.BookingConfiguration = &BookingConfiguration{}
}
abs.BookingConfiguration.UsageStart = &start
}
func (abs *PricedResource) SetLocationEnd(end time.Time) {
func (abs *PricedResource[T]) SetLocationEnd(end time.Time) {
if abs.BookingConfiguration == nil {
abs.BookingConfiguration = &BookingConfiguration{}
}
abs.BookingConfiguration.UsageEnd = &end
}
func (abs *PricedResource) GetBookingMode() booking.BookingMode {
func (abs *PricedResource[T]) GetBookingMode() booking.BookingMode {
if abs.BookingConfiguration == nil {
return booking.WHEN_POSSIBLE
}
return abs.BookingConfiguration.Mode
}
func (abs *PricedResource) GetExplicitDurationInS() float64 {
func (abs *PricedResource[T]) GetExplicitDurationInS() float64 {
if abs.BookingConfiguration == nil {
abs.BookingConfiguration = &BookingConfiguration{}
}
if abs.BookingConfiguration.ExplicitBookingDurationS == 0 {
if abs.BookingConfiguration.UsageEnd == nil && abs.BookingConfiguration.UsageStart == nil {
return time.Duration(1 * time.Hour).Seconds()
return (5 * time.Minute).Seconds()
}
if abs.BookingConfiguration.UsageEnd == nil {
add := abs.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := abs.BookingConfiguration.UsageStart.Add(5 * time.Minute)
abs.BookingConfiguration.UsageEnd = &add
}
return abs.BookingConfiguration.UsageEnd.Sub(*abs.BookingConfiguration.UsageStart).Seconds()
d := abs.BookingConfiguration.UsageEnd.Sub(*abs.BookingConfiguration.UsageStart).Seconds()
if d <= 0 {
return (5 * time.Minute).Seconds()
}
return d
}
return abs.BookingConfiguration.ExplicitBookingDurationS
}
func (r *PricedResource) GetPriceHT() (float64, error) {
func (r *PricedResource[T]) GetPriceHT() (float64, error) {
now := time.Now()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
@@ -134,11 +140,11 @@ func (r *PricedResource) GetPriceHT() (float64, error) {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Resource " + r.ResourceID)
if any(r.SelectedPricing) == nil {
return 0, errors.New("pricing profile must be set for resource " + r.ResourceID)
}
pricing := r.SelectedPricing
return pricing.GetPriceHT(1, 0, *r.BookingConfiguration.UsageStart, *r.BookingConfiguration.UsageEnd, r.Variations)

View File

@@ -1,6 +1,7 @@
package resources
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
@@ -65,10 +66,31 @@ type ProcessingResourcePartnership struct {
}
type PricedProcessingResource struct {
PricedResource
PricedResource[*ProcessingResourcePricingProfile]
IsService bool
}
func (r *PricedProcessingResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &ProcessingResourcePricingProfile{}
}
}
func (r *PricedProcessingResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedProcessingResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedProcessingResource) GetPriceHT() (float64, error) {
r.ensurePricing()
return r.PricedResource.GetPriceHT()
}
func (r *PricedProcessingResource) GetType() tools.DataType {
return tools.PROCESSING_RESOURCE
}
@@ -82,7 +104,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
if a.IsService {
return -1
}
return time.Duration(1 * time.Hour).Seconds()
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
}
@@ -93,12 +115,26 @@ func (d *ProcessingResource) GetAccessor(request *tools.APIRequest) utils.Access
return NewAccessor[*ProcessingResource](tools.PROCESSING_RESOURCE, request, func() utils.DBObject { return &ProcessingResource{} }) // Create a new instance of the accessor
}
func (abs *ProcessingResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.PROCESSING_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data")
}
p, err := ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource[*DataResourcePricingProfile])
return &PricedDataResource{
PricedResource: *priced,
}, nil
}
type ProcessingResourcePricingProfile struct {
pricing.AccessPricingProfile[pricing.TimePricingStrategy] // AccessPricingProfile is the pricing profile of a data it means that we can access the data for an amount of time
}
func (p *ProcessingResourcePricingProfile) IsPurchasable() bool {
return p.Pricing.BuyingStrategy != pricing.UNDEFINED_SUBSCRIPTION
return p.Pricing.BuyingStrategy == pricing.PERMANENT
}
func (p *ProcessingResourcePricingProfile) IsBooked() bool {

View File

@@ -19,6 +19,9 @@ type PurchaseResource struct {
InstanceID string `json:"instance_id,omitempty" bson:"instance_id,omitempty" validate:"required"` // could be a Compute or a Storage
ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"`
// Authorization: identifies who created this draft and the Check session it belongs to.
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
}
func (d *PurchaseResource) GetAccessor(request *tools.APIRequest) utils.Accessor {

View File

@@ -36,8 +36,8 @@ func TestCanUpdate(t *testing.T) {
func TestCanDelete(t *testing.T) {
now := time.Now().UTC()
past := now.Add(-1 * time.Hour)
future := now.Add(1 * time.Hour)
past := now.Add(-5 * time.Minute)
future := now.Add(5 * time.Minute)
t.Run("nil EndDate", func(t *testing.T) {
r := &purchase_resource.PurchaseResource{}

View File

@@ -21,11 +21,11 @@ import (
type AbstractResource struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the resource
Logo string `json:"logo,omitempty" bson:"logo,omitempty" validate:"required"` // Logo is the logo of the resource
Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource
ShortDescription string `json:"short_description,omitempty" bson:"short_description,omitempty" validate:"required"` // ShortDescription is the short description of the resource
Owners []utils.Owner `json:"owners,omitempty" bson:"owners,omitempty"` // Owners is the list of owners of the resource
Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the resource
Logo string `json:"logo,omitempty" bson:"logo,omitempty"` // Logo is the logo of the resource
Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource
ShortDescription string `json:"short_description,omitempty" bson:"short_description,omitempty"` // ShortDescription is the short description of the resource
Owners []utils.Owner `json:"owners,omitempty" bson:"owners,omitempty"` // Owners is the list of owners of the resource
UsageRestrictions string `bson:"usage_restrictions,omitempty" json:"usage_restrictions,omitempty"`
AllowedBookingModes map[booking.BookingMode]*pricing.PricingVariation `bson:"allowed_booking_modes" json:"allowed_booking_modes"`
}
@@ -66,16 +66,16 @@ func (r *AbstractResource) CanDelete() bool {
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"`
}
func (abs *AbstractInstanciatedResource[T]) AddInstances(instance ResourceInstanceITF) {
abs.Instances = append(abs.Instances, instance.(T))
}
func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.DataType,
func ConvertToPricedResource[T pricing.PricingProfileITF](t tools.DataType,
selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int,
selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
selectedBookingModeIndex *int, abs ResourceInterface, request *tools.APIRequest) (pricing.PricedItemITF, error) {
instances := map[string]string{}
var profile pricing.PricingProfileITF
var inst ResourceInstanceITF
@@ -84,7 +84,7 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data
instances[t.GetID()] = t.GetName()
profile = t.GetProfile(request.PeerID, selectedPartnership, selectedBuyingStrategy, selectedStrategy)
} else {
for i, instance := range abs.Instances { // TODO why it crush before ?
for i, instance := range abs.SetAllowedInstances(request) { // TODO why it crush before ?
if i == 0 {
inst = instance
}
@@ -106,20 +106,33 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data
}*/
}
variations := []*pricing.PricingVariation{}
if selectedBookingModeIndex != nil && abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)] != nil {
variations = append(variations, abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)])
if selectedBookingModeIndex != nil && abs.GetBookingModes()[booking.BookingMode(*selectedBookingModeIndex)] != nil {
variations = append(variations, abs.GetBookingModes()[booking.BookingMode(*selectedBookingModeIndex)])
}
return &PricedResource{
Name: abs.Name,
Logo: abs.Logo,
ResourceID: abs.UUID,
InstanceID: inst.GetID(),
ResourceType: t,
Quantity: 1,
InstancesRefs: instances,
SelectedPricing: profile,
Variations: variations,
CreatorID: abs.CreatorID,
// Seed the booking configuration with the instance's historical average duration
// so GetExplicitDurationInS() returns a realistic default out of the box.
var bc *BookingConfiguration
if inst != nil {
if avg := inst.GetAverageDurationS(); avg > 0 {
bc = &BookingConfiguration{ExplicitBookingDurationS: avg}
}
}
instanceID := ""
if inst != nil {
instanceID = inst.GetID()
}
selectedPricing, _ := profile.(T)
return &PricedResource[T]{
Name: abs.GetName(),
ResourceID: abs.GetID(),
InstanceID: instanceID,
ResourceType: t,
Quantity: 1,
InstancesRefs: instances,
SelectedPricing: selectedPricing,
Variations: variations,
CreatorID: abs.GetCreatorID(),
BookingConfiguration: bc,
}, nil
}
@@ -140,11 +153,16 @@ func (r *AbstractInstanciatedResource[T]) GetSelectedInstance(selected *int) Res
return nil
}
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest, instanceID ...string) {
if (request != nil && request.PeerID == abs.CreatorID && request.PeerID != "") || request.Admin {
return
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest, instanceID ...string) []ResourceInstanceITF {
if !((request != nil && request.PeerID == abs.CreatorID && request.PeerID != "") || request.Admin) {
abs.Instances = VerifyAuthAction(abs.Instances, request, instanceID...)
}
abs.Instances = VerifyAuthAction(abs.Instances, request, instanceID...)
inst := []ResourceInstanceITF{}
for _, i := range abs.Instances {
inst = append(inst, i)
}
return inst
}
func (abs *AbstractInstanciatedResource[T]) VerifyAuth(callName string, request *tools.APIRequest) bool {
@@ -196,6 +214,9 @@ type ResourceInstance[T ResourcePartnerITF] struct {
Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"`
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
AverageDurationS float64 `json:"average_duration_s,omitempty" bson:"average_duration_s,omitempty"`
AverageDurationSamples int `json:"average_duration_samples,omitempty" bson:"average_duration_samples,omitempty"`
}
// TODO should kicks all selection
@@ -285,6 +306,17 @@ func (ri *ResourceInstance[T]) ClearPeerGroups() {
}
}
func (ri *ResourceInstance[T]) GetAverageDurationS() float64 {
return ri.AverageDurationS
}
func (ri *ResourceInstance[T]) UpdateAverageDuration(actualS float64) {
buffered := actualS * 1.20
n := float64(ri.AverageDurationSamples)
ri.AverageDurationS = (ri.AverageDurationS*n + buffered) / (n + 1)
ri.AverageDurationSamples++
}
type ResourcePartnerShip[T pricing.PricingProfileITF] struct {
Namespace string `json:"namespace" bson:"namespace" default:"default-namespace"`
PeerGroups map[string][]string `json:"peer_groups,omitempty" bson:"peer_groups,omitempty"`

View File

@@ -2,7 +2,6 @@ package resources
import (
"errors"
"fmt"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
@@ -35,11 +34,11 @@ func (abs *StorageResource) ConvertToPricedResource(t tools.DataType, selectedIn
if t != tools.STORAGE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Storage")
}
p, err := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, request)
p, err := ConvertToPricedResource[*StorageResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource)
priced := p.(*PricedResource[*StorageResourcePricingProfile])
return &PricedStorageResource{
PricedResource: *priced,
}, nil
@@ -170,7 +169,7 @@ type StorageResourcePricingProfile struct {
}
func (p *StorageResourcePricingProfile) IsPurchasable() bool {
return p.Pricing.BuyingStrategy != pricing.UNDEFINED_SUBSCRIPTION
return p.Pricing.BuyingStrategy == pricing.PERMANENT
}
func (p *StorageResourcePricingProfile) IsBooked() bool {
@@ -181,30 +180,43 @@ func (p *StorageResourcePricingProfile) IsBooked() bool {
}
type PricedStorageResource struct {
PricedResource
PricedResource[*StorageResourcePricingProfile]
UsageStorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"`
}
func (r *PricedStorageResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &StorageResourcePricingProfile{}
}
}
func (r *PricedStorageResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedStorageResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedStorageResource) GetType() tools.DataType {
return tools.STORAGE_RESOURCE
}
func (r *PricedStorageResource) GetPriceHT() (float64, error) {
r.ensurePricing()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
}
fmt.Println("GetPriceHT", r.BookingConfiguration.UsageStart, r.BookingConfiguration.UsageEnd)
now := time.Now()
if r.BookingConfiguration.UsageStart == nil {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Storage" + r.ResourceID)
}
pricing := r.SelectedPricing
var err error
amountOfData := float64(1)

View File

@@ -37,7 +37,7 @@ func TestComputeResource_ConvertToPricedResource(t *testing.T) {
func TestComputeResourcePricingProfile_GetPriceHT_CPUs(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
profile := resources.ComputeResourcePricingProfile{
CPUsPrices: map[string]float64{"Xeon": 2.0},
ExploitPricingProfile: pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{
@@ -61,11 +61,18 @@ func TestComputeResourcePricingProfile_GetPriceHT_InvalidParams(t *testing.T) {
func TestPricedComputeResource_GetPriceHT(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
r := resources.PricedComputeResource{
PricedResource: resources.PricedResource{
ResourceID: "comp456",
SelectedPricing: &MockPricingProfile{ReturnCost: 1.0},
PricedResource: resources.PricedResource[*resources.ComputeResourcePricingProfile]{
ResourceID: "comp456",
SelectedPricing: &resources.ComputeResourcePricingProfile{
CPUsPrices: map[string]float64{"Xeon": 2.0},
ExploitPricingProfile: pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{
AccessPricingProfile: pricing.AccessPricingProfile[pricing.TimePricingStrategy]{
Pricing: pricing.PricingStrategy[pricing.TimePricingStrategy]{Price: 1.0},
},
},
},
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start,
UsageEnd: &end,
@@ -73,8 +80,8 @@ func TestPricedComputeResource_GetPriceHT(t *testing.T) {
},
},
CPUsLocated: map[string]float64{"Xeon": 2},
GPUsLocated: map[string]float64{"Tesla": 1},
RAMLocated: 4,
GPUsLocated: map[string]float64{},
RAMLocated: 0,
}
price, err := r.GetPriceHT()
@@ -84,7 +91,7 @@ func TestPricedComputeResource_GetPriceHT(t *testing.T) {
func TestPricedComputeResource_GetPriceHT_MissingProfile(t *testing.T) {
r := resources.PricedComputeResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.ComputeResourcePricingProfile]{
ResourceID: "comp789",
},
}

View File

@@ -76,13 +76,13 @@ func TestDataResourcePricingStrategy_GetQuantity(t *testing.T) {
func TestDataResourcePricingProfile_IsPurchased(t *testing.T) {
profile := &resources.DataResourcePricingProfile{}
profile.Pricing.BuyingStrategy = pricing.SUBSCRIPTION
profile.Pricing.BuyingStrategy = pricing.PERMANENT
assert.True(t, profile.IsPurchasable())
}
func TestPricedDataResource_GetPriceHT(t *testing.T) {
now := time.Now()
later := now.Add(1 * time.Hour)
later := now.Add(5 * time.Minute)
mockPrice := 42.0
pricingProfile := &resources.DataResourcePricingProfile{AccessPricingProfile: pricing.AccessPricingProfile[resources.DataResourcePricingStrategy]{
@@ -91,7 +91,7 @@ func TestPricedDataResource_GetPriceHT(t *testing.T) {
pricingProfile.Pricing.OverrideStrategy = resources.PER_GB_DOWNLOADED
r := &resources.PricedDataResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.DataResourcePricingProfile]{
SelectedPricing: pricingProfile,
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &now,
@@ -107,7 +107,7 @@ func TestPricedDataResource_GetPriceHT(t *testing.T) {
func TestPricedDataResource_GetPriceHT_NoProfiles(t *testing.T) {
r := &resources.PricedDataResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.DataResourcePricingProfile]{
ResourceID: "test-resource",
},
}

View File

@@ -36,7 +36,7 @@ func (m *MockPricingProfile) GetPriceHT(amount float64, explicitDuration float64
// ---- Tests ----
func TestGetIDAndCreatorAndType(t *testing.T) {
r := resources.PricedResource{
r := resources.PricedResource[pricing.PricingProfileITF]{
ResourceID: "res-123",
CreatorID: "user-abc",
ResourceType: tools.DATA_RESOURCE,
@@ -48,23 +48,23 @@ func TestGetIDAndCreatorAndType(t *testing.T) {
func TestIsPurchased(t *testing.T) {
t.Run("nil selected pricing returns false", func(t *testing.T) {
r := &resources.PricedResource{}
r := &resources.PricedResource[pricing.PricingProfileITF]{}
assert.False(t, r.IsPurchasable())
})
t.Run("returns true if pricing profile is purchased", func(t *testing.T) {
mock := &MockPricingProfile{Purchased: true}
r := &resources.PricedResource{SelectedPricing: mock}
r := &resources.PricedResource[pricing.PricingProfileITF]{SelectedPricing: mock}
assert.True(t, r.IsPurchasable())
})
}
func TestGetAndSetLocationStartEnd(t *testing.T) {
r := &resources.PricedResource{}
r := &resources.PricedResource[pricing.PricingProfileITF]{}
now := time.Now()
r.SetLocationStart(now)
r.SetLocationEnd(now.Add(2 * time.Hour))
r.SetLocationEnd(now.Add(10 * time.Minute))
assert.Equal(t, now, *r.GetLocationStart())
assert.Equal(t, now.Add(2*time.Hour), *r.GetLocationEnd())
@@ -72,7 +72,7 @@ func TestGetAndSetLocationStartEnd(t *testing.T) {
func TestGetExplicitDurationInS(t *testing.T) {
t.Run("uses explicit duration if set", func(t *testing.T) {
r := &resources.PricedResource{BookingConfiguration: &resources.BookingConfiguration{
r := &resources.PricedResource[pricing.PricingProfileITF]{BookingConfiguration: &resources.BookingConfiguration{
ExplicitBookingDurationS: 3600,
},
}
@@ -81,8 +81,8 @@ func TestGetExplicitDurationInS(t *testing.T) {
t.Run("computes duration from start and end", func(t *testing.T) {
start := time.Now()
end := start.Add(2 * time.Hour)
r := &resources.PricedResource{
end := start.Add(10 * time.Minute)
r := &resources.PricedResource[pricing.PricingProfileITF]{
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start, UsageEnd: &end,
},
@@ -91,14 +91,14 @@ func TestGetExplicitDurationInS(t *testing.T) {
})
t.Run("defaults to 1 hour when times not set", func(t *testing.T) {
r := &resources.PricedResource{}
r := &resources.PricedResource[pricing.PricingProfileITF]{}
assert.InDelta(t, 3600.0, r.GetExplicitDurationInS(), 0.1)
})
}
func TestGetPriceHT(t *testing.T) {
t.Run("returns error if no pricing profile", func(t *testing.T) {
r := &resources.PricedResource{ResourceID: "no-profile"}
r := &resources.PricedResource[pricing.PricingProfileITF]{ResourceID: "no-profile"}
price, err := r.GetPriceHT()
require.Error(t, err)
assert.Contains(t, err.Error(), "pricing profile must be set")
@@ -107,7 +107,7 @@ func TestGetPriceHT(t *testing.T) {
t.Run("defaults BookingConfiguration when nil", func(t *testing.T) {
mock := &MockPricingProfile{ReturnCost: 42.0}
r := &resources.PricedResource{
r := &resources.PricedResource[pricing.PricingProfileITF]{
SelectedPricing: mock,
}
price, err := r.GetPriceHT()
@@ -117,9 +117,9 @@ func TestGetPriceHT(t *testing.T) {
t.Run("returns error if profile GetPriceHT fails", func(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
mock := &MockPricingProfile{ReturnErr: true}
r := &resources.PricedResource{
r := &resources.PricedResource[pricing.PricingProfileITF]{
SelectedPricing: mock,
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start,
@@ -133,9 +133,9 @@ func TestGetPriceHT(t *testing.T) {
t.Run("uses SelectedPricing if set", func(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
mock := &MockPricingProfile{ReturnCost: 10.0}
r := &resources.PricedResource{
r := &resources.PricedResource[pricing.PricingProfileITF]{
SelectedPricing: mock,
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start,

View File

@@ -23,7 +23,7 @@ func TestPricedProcessingResource_GetType(t *testing.T) {
func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
now := time.Now()
after := now.Add(2 * time.Hour)
after := now.Add(10 * time.Minute)
tests := []struct {
name string
@@ -40,30 +40,30 @@ func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
{
name: "Nil start time, non-service",
input: PricedProcessingResource{
PricedResource: PricedResource{
PricedResource: PricedResource[*ProcessingResourcePricingProfile]{
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: nil,
},
},
},
expected: float64((1 * time.Hour).Seconds()),
expected: float64((5 * time.Minute).Seconds()),
},
{
name: "Duration computed from start and end",
input: PricedProcessingResource{
PricedResource: PricedResource{
PricedResource: PricedResource[*ProcessingResourcePricingProfile]{
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &now,
UsageEnd: &after,
},
},
},
expected: float64((2 * time.Hour).Seconds()),
expected: float64((10 * time.Minute).Seconds()),
},
{
name: "Explicit duration takes precedence",
input: PricedProcessingResource{
PricedResource: PricedResource{
PricedResource: PricedResource[*ProcessingResourcePricingProfile]{
BookingConfiguration: &resources.BookingConfiguration{
ExplicitBookingDurationS: 1337,
},
@@ -89,14 +89,14 @@ func TestProcessingResource_GetAccessor(t *testing.T) {
func TestProcessingResourcePricingProfile_GetPriceHT(t *testing.T) {
start := time.Now()
end := start.Add(2 * time.Hour)
end := start.Add(10 * time.Minute)
mockPricing := pricing.AccessPricingProfile[pricing.TimePricingStrategy]{
Pricing: pricing.PricingStrategy[pricing.TimePricingStrategy]{
Price: 100.0,
},
}
profile := &ProcessingResourcePricingProfile{AccessPricingProfile: mockPricing}
price, err := profile.GetPriceHT(0, 0, start, end, []*pricing.PricingVariation{})
price, err := profile.GetPriceHT(1, 0, start, end, []*pricing.PricingVariation{})
assert.NoError(t, err)
assert.Equal(t, 100.0, price)
}

View File

@@ -81,8 +81,8 @@ func TestGetSelectedInstance_NoIndex(t *testing.T) {
}
func TestCanUpdate_WhenOnlyStateDiffers(t *testing.T) {
resource := &resources.AbstractResource{AbstractObject: utils.AbstractObject{IsDraft: false}}
set := &MockDBObject{isDraft: true}
resource := &resources.AbstractResource{AbstractObject: utils.AbstractObject{IsDraft: true}}
set := &MockDBObject{isDraft: false}
canUpdate, updated := resource.CanUpdate(set)
assert.True(t, canUpdate)
assert.Equal(t, set, updated)
@@ -105,8 +105,13 @@ type FakeResource struct {
resources.AbstractInstanciatedResource[*MockInstance]
}
func (f *FakeResource) SetAllowedInstances(*tools.APIRequest, ...string) {}
func (f *FakeResource) VerifyAuth(string, *tools.APIRequest) bool { return true }
func (f *FakeResource) SetAllowedInstances(req *tools.APIRequest, instance_id ...string) []resources.ResourceInstanceITF {
return nil
}
func (f *FakeResource) ConvertToPricedResource(t tools.DataType, a *int, b *int, c *int, d *int, e *int, req *tools.APIRequest) (pricing.PricedItemITF, error) {
return nil, nil
}
func (f *FakeResource) VerifyAuth(string, *tools.APIRequest) bool { return true }
func TestNewAccessor_ReturnsValid(t *testing.T) {
acc := resources.NewAccessor[*FakeResource](tools.COMPUTE_RESOURCE, &tools.APIRequest{}, func() utils.DBObject {

View File

@@ -96,7 +96,7 @@ func TestStorageResourcePricingStrategy_GetQuantity_Invalid(t *testing.T) {
func TestPricedStorageResource_GetPriceHT_NoProfiles(t *testing.T) {
res := &resources.PricedStorageResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.StorageResourcePricingProfile]{
ResourceID: "res-id",
},
}

View File

@@ -30,8 +30,9 @@ func (d *WorkflowResource) ClearEnv() utils.DBObject {
return d
}
func (w *WorkflowResource) SetAllowedInstances(request *tools.APIRequest, ids ...string) {
func (w *WorkflowResource) SetAllowedInstances(request *tools.APIRequest, ids ...string) []ResourceInstanceITF {
/* EMPTY */
return []ResourceInstanceITF{}
}
func (r *WorkflowResource) GetSelectedInstance(selected *int) ResourceInstanceITF {
@@ -39,7 +40,7 @@ func (r *WorkflowResource) GetSelectedInstance(selected *int) ResourceInstanceIT
}
func (w *WorkflowResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
return &PricedResource{
return &PricedResource[*pricing.ExploitPricingProfile[pricing.TimePricingStrategy]]{
Name: w.Name,
Logo: w.Logo,
ResourceID: w.UUID,

View File

@@ -176,6 +176,10 @@ type AbstractAccessor[T DBObject] struct {
NotImplemented []string
}
func (r *AbstractAccessor[T]) NewObj() DBObject {
return r.New()
}
func (r *AbstractAccessor[T]) ShouldVerifyAuth() bool {
return true
}

View File

@@ -32,7 +32,6 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
if data.GetID() == "" {
data.GenerateID()
}
data.SetID(data.GetID())
data.StoreDraftDefault()
data.UpToDate(a.GetUser(), a.GetPeerID(), true)
data.Unsign()
@@ -70,6 +69,12 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
// GenericLoadOne loads one object from the database (generic)
func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
res, code, err := a.LoadOne(id)
if err != nil {
return res, code, err
}
if res == nil {
return res, code, errors.New("not found")
}
if !res.CanDelete() {
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
}
@@ -92,7 +97,7 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
if err != nil {
return nil, nil, c, err
}
obj := &AbstractObject{}
obj := a.NewObj()
b, _ := json.Marshal(r)
json.Unmarshal(b, obj)
ok, r := r.CanUpdate(obj)

View File

@@ -40,6 +40,7 @@ type DBObject interface {
// Accessor is an interface that defines the basic methods for an Accessor
type Accessor interface {
NewObj() DBObject
GetUser() string
GetPeerID() string
GetGroups() []string

View File

@@ -1,8 +1,6 @@
package graph
import (
"time"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -67,46 +65,32 @@ func (wf *Graph) IsWorkflow(item GraphItem) bool {
return item.Workflow != nil
}
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.ProcessingResource, resource resources.ResourceInterface,
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(processings []*resources.ProcessingResource, resource resources.ResourceInterface,
f func(GraphItem) resources.ResourceInterface, instance int, partnership int, buying int, strategy int, bookingMode int, request *tools.APIRequest) (float64, float64, error) {
nearestStart := float64(10000000000)
oneIsInfinite := false
longestDuration := float64(0)
for _, link := range g.Links {
for _, processing := range processings {
var source string // source is the source of the link
if link.Destination.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the destination is the processing and the source is not a compute
source = link.Source.ID
} else if link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the source is the processing and the destination is not a compute
source = link.Destination.ID
if !(link.Destination.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID()) &&
!(link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID()) {
continue
}
priced, err := processing.ConvertToPricedResource(tools.PROCESSING_RESOURCE, &instance, &partnership, &buying, &strategy, &bookingMode, request)
if err != nil {
return 0, 0, err
}
if source != "" {
if priced.GetLocationStart() != nil {
near := float64(priced.GetLocationStart().Sub(start).Seconds())
if near < nearestStart {
nearestStart = near
}
}
if priced.GetLocationEnd() != nil {
duration := float64(priced.GetLocationEnd().Sub(*priced.GetLocationStart()).Seconds())
if longestDuration < duration {
longestDuration = duration
}
} else {
oneIsInfinite = true
}
duration := priced.GetExplicitDurationInS()
if duration < 0 {
oneIsInfinite = true
} else if longestDuration < duration {
longestDuration = duration
}
}
}
if oneIsInfinite {
return nearestStart, -1, nil
return 0, -1, nil
}
return nearestStart, longestDuration, nil
return 0, longestDuration, nil
}
/*
@@ -155,7 +139,7 @@ func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingI
func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterface) {
if item, ok := g.Items[id]; ok {
if item.Data != nil {
if item.NativeTool != nil {
return tools.NATIVE_TOOL, item.NativeTool
} else if item.Data != nil {
return tools.DATA_RESOURCE, item.Data

282
models/workflow/plantuml.go Normal file
View File

@@ -0,0 +1,282 @@
package workflow
import (
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
)
// ---------------------------------------------------------------------------
// PlantUML export
// ---------------------------------------------------------------------------
// plantUMLProcedures defines !procedure blocks for each resource type.
// Parameters use the $var/$name convention of PlantUML preprocessor v2.
// Calls are written WITHOUT inline comments (comment on the following line)
// to avoid the "assumed sequence diagram" syntax error.
const plantUMLProcedures = `!procedure Processing($var, $name)
component "$name" as $var <<Processing>>
!endprocedure
!procedure Data($var, $name)
file "$name" as $var <<Data>>
!endprocedure
!procedure Storage($var, $name)
database "$name" as $var <<Storage>>
!endprocedure
!procedure ComputeUnit($var, $name)
node "$name" as $var <<ComputeUnit>>
!endprocedure
!procedure WorkflowEvent($var, $name)
usecase "$name" as $var <<WorkflowEvent>>
!endprocedure
!procedure Workflow($var, $name)
frame "$name" as $var <<Workflow>>
!endprocedure
`
// ToPlantUML serializes the workflow graph to a valid, renderable PlantUML file
// that is also compatible with ExtractFromPlantUML (round-trip).
// Resource and instance attributes are written as human-readable comments:
//
// Processing(p1, "NDVI") ' access.container.image: myrepo/ndvi:1.2, infrastructure: 0
func (w *Workflow) ToPlantUML() string {
var sb strings.Builder
sb.WriteString("@startuml\n\n")
sb.WriteString(plantUMLProcedures)
sb.WriteByte('\n')
varNames := plantUMLVarNames(w.Graph.Items)
// --- resource declarations ---
for id, item := range w.Graph.Items {
if line := plantUMLItemLine(varNames[id], item); line != "" {
sb.WriteString(line + "\n")
}
}
sb.WriteByte('\n')
// --- links ---
for _, link := range w.Graph.Links {
src := varNames[link.Source.ID]
dst := varNames[link.Destination.ID]
if src == "" || dst == "" {
continue
}
sb.WriteString(fmt.Sprintf("%s --> %s\n", src, dst))
if comment := plantUMLLinkComment(link); comment != "" {
sb.WriteString("' " + comment + "\n")
}
}
sb.WriteString("\n@enduml\n")
return sb.String()
}
// plantUMLVarNames assigns short, deterministic variable names to each graph
// item (d1, d2, p1, s1, c1, e1, wf1 …).
func plantUMLVarNames(items map[string]graph.GraphItem) map[string]string {
counters := map[string]int{}
varNames := map[string]string{}
// Sort IDs for deterministic output
ids := make([]string, 0, len(items))
for id := range items {
ids = append(ids, id)
}
sort.Strings(ids)
for _, id := range ids {
prefix := plantUMLPrefix(items[id])
counters[prefix]++
varNames[id] = fmt.Sprintf("%s%d", prefix, counters[prefix])
}
return varNames
}
func plantUMLPrefix(item graph.GraphItem) string {
switch {
case item.NativeTool != nil:
return "e"
case item.Data != nil:
return "d"
case item.Processing != nil:
return "p"
case item.Storage != nil:
return "s"
case item.Compute != nil:
return "c"
case item.Workflow != nil:
return "wf"
}
return "u"
}
// plantUMLItemLine builds the PlantUML declaration line for one graph item.
func plantUMLItemLine(varName string, item graph.GraphItem) string {
switch {
case item.NativeTool != nil:
// WorkflowEvent has no instance and no configurable attributes.
return fmt.Sprintf("WorkflowEvent(%s, \"%s\")", varName, item.NativeTool.GetName())
case item.Data != nil:
return plantUMLResourceLine("Data", varName, item.Data)
case item.Processing != nil:
return plantUMLResourceLine("Processing", varName, item.Processing)
case item.Storage != nil:
return plantUMLResourceLine("Storage", varName, item.Storage)
case item.Compute != nil:
return plantUMLResourceLine("ComputeUnit", varName, item.Compute)
case item.Workflow != nil:
return plantUMLResourceLine("Workflow", varName, item.Workflow)
}
return ""
}
func plantUMLResourceLine(macro, varName string, res resources.ResourceInterface) string {
decl := fmt.Sprintf("%s(%s, \"%s\")", macro, varName, res.GetName())
if comment := plantUMLResourceComment(res); comment != "" {
// Comment on the line AFTER the declaration. ExtractFromPlantUML uses
// look-ahead to merge it back. No inline comment = no !procedure conflict.
return decl + "\n' " + comment
}
return decl
}
// plantUMLResourceComment merges resource-level fields with the first instance
// fields (instance overrides resource) and formats them as human-readable pairs.
func plantUMLResourceComment(res resources.ResourceInterface) string {
m := plantUMLToFlatMap(res)
if inst := res.GetSelectedInstance(nil); inst != nil {
for k, v := range plantUMLToFlatMap(inst) {
m[k] = v
}
}
return plantUMLFlatMapToComment(m)
}
// plantUMLLinkComment serializes StorageLinkInfos (first entry) as flat
// human-readable pairs prefixed with "storage_link_infos.".
func plantUMLLinkComment(link graph.GraphLink) string {
if len(link.StorageLinkInfos) == 0 {
return ""
}
infoFlat := plantUMLToFlatMap(link.StorageLinkInfos[0])
prefixed := make(map[string]string, len(infoFlat))
for k, v := range infoFlat {
prefixed["storage_link_infos."+k] = v
}
return plantUMLFlatMapToComment(prefixed)
}
// ---------------------------------------------------------------------------
// Flat-map helpers (shared by import & export)
// ---------------------------------------------------------------------------
// plantUMLSkipFields lists JSON field names (root keys) that must never appear
// in human-readable comments. All names are the actual JSON tags, not Go field names.
var plantUMLSkipFields = map[string]bool{
// AbstractObject — identity & audit (json tags)
"id": true, "name": true, "is_draft": true, "access_mode": true, "signature": true,
"creator_id": true, "user_creator_id": true,
"creation_date": true, "update_date": true,
"updater_id": true, "user_updater_id": true,
// internal resource type identifier (AbstractResource.Type / GetType())
"type": true,
// relationships / pricing
"instances": true, "partnerships": true,
"allowed_booking_modes": true, "usage_restrictions": true,
// display / admin
"logo": true, "description": true, "short_description": true, "owners": true,
// runtime params
"env": true, "inputs": true, "outputs": true,
// NativeTool internals
"kind": true, "params": true,
}
// zeroTimeStr is the JSON representation of Go's zero time.Time value.
// encoding/json does not treat it as "empty" for omitempty, so we filter it explicitly.
const zeroTimeStr = "0001-01-01T00:00:00Z"
// plantUMLToFlatMap marshals v to JSON and flattens the resulting object into
// a map[string]string using dot notation for nested keys, skipping zero values
// and known meta fields.
func plantUMLToFlatMap(v interface{}) map[string]string {
b, err := json.Marshal(v)
if err != nil {
return nil
}
var raw map[string]interface{}
if err := json.Unmarshal(b, &raw); err != nil {
return nil
}
result := map[string]string{}
plantUMLFlattenJSON(raw, "", result)
return result
}
// plantUMLFlattenJSON recursively walks a JSON object and writes scalar leaf
// values into result using dot-notation keys.
func plantUMLFlattenJSON(m map[string]interface{}, prefix string, result map[string]string) {
for k, v := range m {
fullKey := k
if prefix != "" {
fullKey = prefix + "." + k
}
// Skip fields whose root key is in the deny-list
if plantUMLSkipFields[strings.SplitN(fullKey, ".", 2)[0]] {
continue
}
switch val := v.(type) {
case map[string]interface{}:
plantUMLFlattenJSON(val, fullKey, result)
case []interface{}:
// Arrays are not representable in flat human-readable format; skip.
case float64:
if val != 0 {
if val == float64(int64(val)) {
result[fullKey] = strconv.FormatInt(int64(val), 10)
} else {
result[fullKey] = strconv.FormatFloat(val, 'f', -1, 64)
}
}
case bool:
if val {
result[fullKey] = "true"
}
case string:
if val != "" && val != zeroTimeStr {
result[fullKey] = val
}
}
}
}
// plantUMLFlatMapToComment converts a flat map to a sorted "key: value, …" string.
func plantUMLFlatMapToComment(m map[string]string) string {
if len(m) == 0 {
return ""
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, k := range keys {
parts = append(parts, k+": "+m[k])
}
return strings.Join(parts, ", ")
}

View File

@@ -6,16 +6,20 @@ import (
"errors"
"fmt"
"mime/multipart"
"strconv"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -137,41 +141,82 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
},
}
},
// WorkflowEvent creates a NativeTool of Kind=WORKFLOW_EVENT directly,
// without DB lookup. It has no user-defined instance.
"WorkflowEvent": func() resources.ResourceInterface {
return &resources.NativeTool{
Kind: int(native_tools.WORKFLOW_EVENT),
}
},
}
graphVarName := map[string]*graph.GraphItem{}
scanner := bufio.NewScanner(plantUML)
graphVarName := map[string]graph.GraphItem{}
// Collect all lines first to support look-ahead (comment on the line after
// the declaration, as produced by ToPlantUML).
scanner := bufio.NewScanner(plantUML)
var lines []string
for scanner.Scan() {
line := scanner.Text()
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
return d, err
}
for i, line := range lines {
trimmed := strings.TrimSpace(line)
// Skip pure comment lines and PlantUML directives — they must never be
// parsed as resource declarations or links. Without this guard, a comment
// like "' source: http://my-server.com" would match the "-" link check.
if strings.HasPrefix(trimmed, "'") ||
strings.HasPrefix(trimmed, "!") ||
strings.HasPrefix(trimmed, "@") ||
trimmed == "" {
continue
}
// Build the parse line: if the current line has no inline comment and the
// next line is a pure comment, append it so parsers receive one combined line.
// Also handles the legacy inline-comment format unchanged.
parseLine := line
if !strings.Contains(line, "'") && i+1 < len(lines) {
if next := strings.TrimSpace(lines[i+1]); strings.HasPrefix(next, "'") {
parseLine = line + " " + next
}
}
for n, new := range resourceCatalog {
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") { // should exclude declaration of type.
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") { // exclude macro declarations
newRes := new()
varName, graphItem, err := d.extractResourcePlantUML(line, newRes, n, request.PeerID)
newRes.SetID(uuid.New().String())
varName, graphItem, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID)
if err != nil {
return d, err
}
graphVarName[varName] = graphItem
if graphItem != nil {
graphVarName[varName] = *graphItem
}
continue
} else if strings.Contains(line, n+"-->") {
err := d.extractLink(line, graphVarName, "-->", false)
} else if strings.Contains(line, "-->") {
err := d.extractLink(parseLine, graphVarName, "-->", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"<--") {
err := d.extractLink(line, graphVarName, "<--", true)
} else if strings.Contains(line, "<--") {
err := d.extractLink(parseLine, graphVarName, "<--", true)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"--") {
err := d.extractLink(line, graphVarName, "--", false)
} else if strings.Contains(line, "--") {
err := d.extractLink(parseLine, graphVarName, "--", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"-") {
err := d.extractLink(line, graphVarName, "-", false)
} else if strings.Contains(line, "-") {
err := d.extractLink(parseLine, graphVarName, "-", false)
if err != nil {
fmt.Println(err)
continue
@@ -179,39 +224,104 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
}
}
}
if err := scanner.Err(); err != nil {
return d, err
}
d.generateResource(d.GetResources(tools.DATA_RESOURCE), request)
d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request)
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
d.Graph.Items = graphVarName
return d, nil
}
func (d *Workflow) generateResource(datas []resources.ResourceInterface, request *tools.APIRequest) error {
for _, d := range datas {
access := d.GetAccessor(request)
if _, code, err := access.LoadOne(d.GetID()); err != nil && code == 200 {
if d.GetType() == tools.COMPUTE_RESOURCE.String() {
access := live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, request)
if b, err := json.Marshal(d); err == nil {
var liv live.LiveDatacenter
json.Unmarshal(b, &liv)
data, _, err := access.StoreOne(&liv)
if err == nil {
access.CopyOne(data)
}
}
continue
} else if d.GetType() == tools.STORAGE_RESOURCE.String() {
access := live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, request)
if b, err := json.Marshal(d); err == nil {
var liv live.LiveStorage
json.Unmarshal(b, &liv)
data, _, err := access.StoreOne(&liv)
if err == nil {
access.CopyOne(data)
}
}
continue
}
access.StoreOne(d)
d.GetAccessor(request).StoreOne(d)
}
return nil
}
func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.GraphItem, pattern string, reverse bool) error {
// setNestedKey sets a value in a nested map using dot-notation path.
// "access.container.image" → m["access"]["container"]["image"] = value
func setNestedKey(m map[string]any, path string, value any) {
parts := strings.SplitN(path, ".", 2)
if len(parts) == 1 {
m[path] = value
return
}
key, rest := parts[0], parts[1]
if _, ok := m[key]; !ok {
m[key] = map[string]any{}
}
if sub, ok := m[key].(map[string]any); ok {
setNestedKey(sub, rest, value)
}
}
// parseHumanFriendlyAttrs converts a human-friendly comment into JSON bytes.
// Supports:
// - flat: "source: http://example.com, encryption: true, size: 500"
// - nested: "access.container.image: nginx, access.container.tag: latest"
// - raw JSON passthrough (backward-compat): '{"key": "value"}'
//
// Values are auto-typed: bool, float64, or string.
// Note: the first ':' in each pair is the key/value separator,
// so URLs like "http://..." are handled correctly.
func parseHumanFriendlyAttrs(comment string) []byte {
comment = strings.TrimSpace(comment)
if strings.HasPrefix(comment, "{") {
return []byte(comment)
}
m := map[string]any{}
for _, pair := range strings.Split(comment, ",") {
pair = strings.TrimSpace(pair)
parts := strings.SplitN(pair, ":", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
var typed any
if b, err := strconv.ParseBool(val); err == nil {
typed = b
} else if n, err := strconv.ParseFloat(val, 64); err == nil {
typed = n
} else {
typed = val
}
setNestedKey(m, key, typed)
}
b, _ := json.Marshal(m)
return b
}
func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphItem, pattern string, reverse bool) error {
splitted := strings.Split(line, pattern)
if len(splitted) < 2 {
return errors.New("links elements not found")
}
if graphVarName[splitted[0]] != nil {
return errors.New("links elements not found -> " + strings.Trim(splitted[0], " "))
}
if graphVarName[splitted[1]] != nil {
return errors.New("links elements not found -> " + strings.Trim(splitted[1], " "))
}
link := &graph.GraphLink{
Source: graph.Position{
ID: graphVarName[splitted[0]].ID,
@@ -230,11 +340,10 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.Graph
link.Source = tmp
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) <= 1 {
return errors.New("Can't deserialize Object, there's no commentary")
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
json.Unmarshal(parseHumanFriendlyAttrs(comment), link)
}
comment := strings.ReplaceAll(splittedComments[1], "'", "") // for now it's a json.
json.Unmarshal([]byte(comment), link)
d.Graph.Links = append(d.Graph.Links, *link)
return nil
}
@@ -245,7 +354,7 @@ func (d *Workflow) extractResourcePlantUML(line string, resource resources.Resou
return "", nil, errors.New("Can't deserialize Object, there's no func")
}
splittedParams := strings.Split(splittedFunc[1], ",")
if len(splittedFunc) <= 1 {
if len(splittedParams) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no params")
}
@@ -255,32 +364,40 @@ func (d *Workflow) extractResourcePlantUML(line string, resource resources.Resou
if len(splitted) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no name")
}
resource.SetName(splitted[1])
resource.SetName(strings.ReplaceAll(splitted[1], "\\n", " "))
splittedComments := strings.Split(line, "'")
if len(splittedComments) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no commentary")
}
comment := strings.ReplaceAll(splittedComments[1], "'", "") // for now it's a json.
// Resources with instances get a default one seeded from the parent resource,
// then overridden by any explicit comment attributes.
// Event (NativeTool) has no instance: getNewInstance returns nil and is skipped.
instance := d.getNewInstance(dataName, splitted[1], peerID)
if instance == nil {
return "", nil, errors.New("No instance found.")
if instance != nil {
if b, err := json.Marshal(resource); err == nil {
json.Unmarshal(b, instance)
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
json.Unmarshal(parseHumanFriendlyAttrs(comment), instance)
}
resource.AddInstances(instance)
}
resource.AddInstances(instance)
json.Unmarshal([]byte(comment), instance)
// deserializer les instances... une instance doit par défaut avoir certaines valeurs d'accès.
graphID := uuid.New()
graphItem := &graph.GraphItem{
ID: graphID.String(),
item := d.getNewGraphItem(dataName, resource)
if item != nil {
d.Graph.Items[item.ID] = *item
}
graphItem = d.getNewGraphItem(dataName, graphItem, resource)
d.Graph.Items[graphID.String()] = *graphItem
return varName, graphItem, nil
return varName, item, nil
}
func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem, resource resources.ResourceInterface) *graph.GraphItem {
func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
if resource == nil {
return nil
}
graphItem := &graph.GraphItem{
ID: uuid.New().String(),
ItemResource: &resources.ItemResource{},
}
switch dataName {
case "Data":
d.Datas = append(d.Datas, resource.GetID())
@@ -290,15 +407,13 @@ func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem,
d.Processings = append(d.Processings, resource.GetID())
d.ProcessingResources = append(d.ProcessingResources, resource.(*resources.ProcessingResource))
graphItem.Processing = resource.(*resources.ProcessingResource)
case "Event":
access := resources.NewAccessor[*resources.NativeTool](tools.NATIVE_TOOL, &tools.APIRequest{
Admin: true,
}, func() utils.DBObject { return &resources.NativeTool{} })
t, _, err := access.Search(nil, "WORKFLOW_EVENT", false)
if err == nil && len(t) > 0 {
d.NativeTool = append(d.NativeTool, t[0].GetID())
graphItem.NativeTool = t[0].(*resources.NativeTool)
}
case "WorkflowEvent":
// The resource is already a *NativeTool with Kind=WORKFLOW_EVENT set by the
// catalog factory. We use it directly without any DB lookup.
nt := resource.(*resources.NativeTool)
nt.Name = native_tools.WORKFLOW_EVENT.String()
d.NativeTool = append(d.NativeTool, nt.GetID())
graphItem.NativeTool = nt
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
d.StorageResources = append(d.StorageResources, resource.(*resources.StorageResource))
@@ -308,7 +423,7 @@ func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem,
d.ComputeResources = append(d.ComputeResources, resource.(*resources.ComputeResource))
graphItem.Compute = resource.(*resources.ComputeResource)
default:
return graphItem
return nil
}
return graphItem
}
@@ -480,8 +595,36 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
return true, nil
}
func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
// preemptDelay is the minimum lead time granted before a preempted booking starts.
const preemptDelay = 30 * time.Second
// Planify computes the scheduled start/end for every resource in the workflow.
//
// bookingMode controls availability checking when p (a live planner snapshot) is provided:
// - PREEMPTED : start from now+preemptDelay regardless of existing load.
// - WHEN_POSSIBLE: start from max(now, start); if a slot conflicts, slide to the next free window.
// - PLANNED : use start as-is; return an error if the slot is not available.
//
// Passing p = nil skips all availability checks (useful for sub-workflow recursion).
func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, p planner.PlannerITF, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
// 1. Adjust global start based on booking mode.
now := time.Now()
switch booking.BookingMode(bookingMode) {
case booking.PREEMPTED:
if earliest := now.Add(preemptDelay); start.Before(earliest) {
start = earliest
}
case booking.WHEN_POSSIBLE:
if start.Before(now) {
start = now
}
// PLANNED: honour the caller's start date as-is.
}
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
var err error
// 2. Plan processings first so we can derive the total workflow duration.
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
@@ -492,12 +635,17 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
})
if err != nil {
return false, 0, priceds, nil, err
}
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
workflowDuration := common.GetPlannerLongestTime(priceds)
if _, priceds, err = plan[resources.ResourceInterface](tools.NATIVE_TOOL, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
wf.Graph.IsNativeTool, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
return start, 0, nil
@@ -514,11 +662,13 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}); err != nil {
return false, 0, priceds, nil, err
}
// 3. Compute/storage: duration = total workflow duration (conservative bound).
for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage,
tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
if _, priceds, err = plan[resources.ResourceInterface](k, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
nearestStart, longestDuration, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
nearestStart, _, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
if f(i) {
_, r = i.GetResource()
}
@@ -526,27 +676,31 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}, *instances.Get(res.GetID()), *partnerships.Get(res.GetID()),
*buyings.Get(res.GetID()), *strategies.Get(res.GetID()), bookingMode, request)
if err != nil {
return start, longestDuration, err
return start, workflowDuration, err
}
return start.Add(time.Duration(nearestStart) * time.Second), longestDuration, nil
return start.Add(time.Duration(nearestStart) * time.Second), workflowDuration, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
if duration < 0 {
return nil, nil // service: open-ended booking
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
}
longest := common.GetPlannerLongestTime(end, priceds, request)
longest := workflowDuration
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, instances, partnerships, buyings, strategies,
bookingMode, wf, priceds, request, wf.Graph.IsWorkflow,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second)
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds)) * time.Second)
longest := float64(-1)
r, code, err := res.GetAccessor(request).LoadOne(res.GetID())
if code != 200 || err != nil {
return start, longest, err
}
_, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, request)
_, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, nil, request)
// should ... import priced
if err != nil {
return start, longest, err
@@ -564,13 +718,26 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}
}
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second), longest, nil
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds)) * time.Second), longest, nil
}, func(start time.Time, longest float64) (*time.Time, error) {
s := start.Add(time.Duration(longest) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// 4. Availability check against the live planner (skipped for PREEMPTED and sub-workflows).
if p != nil && booking.BookingMode(bookingMode) != booking.PREEMPTED {
slide, err := plannerAvailabilitySlide(p, priceds, booking.BookingMode(bookingMode))
if err != nil {
return false, 0, priceds, nil, err
}
if slide > 0 {
// Re-plan from the corrected start; pass nil planner to avoid infinite recursion.
return wf.Planify(start.Add(slide), end, instances, partnerships, buyings, strategies, bookingMode, nil, request)
}
}
isPreemptible := true
for _, first := range wf.GetFirstItems() {
_, res := first.GetResource()
@@ -582,6 +749,36 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
return isPreemptible, longest, priceds, wf, nil
}
// plannerAvailabilitySlide checks all compute/storage resources in priceds against the planner.
// For PLANNED mode it returns an error immediately on the first conflict.
// For WHEN_POSSIBLE it returns the maximum slide (duration to add to global start) needed to
// clear all conflicts, or 0 if the plan is already conflict-free.
func plannerAvailabilitySlide(p planner.PlannerITF, priceds map[tools.DataType]map[string]pricing.PricedItemITF, mode booking.BookingMode) (time.Duration, error) {
maxSlide := time.Duration(0)
for _, dt := range []tools.DataType{tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE} {
for _, priced := range priceds[dt] {
locStart := priced.GetLocationStart()
locEnd := priced.GetLocationEnd()
if locStart == nil || locEnd == nil {
continue // open-ended: skip availability check
}
d := locEnd.Sub(*locStart)
next := p.NextAvailableStart(priced.GetID(), priced.GetInstanceID(), *locStart, d)
slide := next.Sub(*locStart)
if slide <= 0 {
continue
}
if mode == booking.PLANNED {
return 0, errors.New("requested slot is not available for resource " + priced.GetID())
}
if slide > maxSlide {
maxSlide = slide
}
}
}
return maxSlide, nil
}
// Returns a map of DataType (processing,computing,data,storage,worfklow) where each resource (identified by its UUID)
// is mapped to the list of its items (different appearance) in the graph
// ex: if the same Minio storage is represented by several nodes in the graph, in [tools.STORAGE_RESSOURCE] its UUID will be mapped to
@@ -642,9 +839,6 @@ func plan[T resources.ResourceInterface](
priced.SetLocationEnd(*e)
}
}
if e, err := end(started, priced.GetExplicitDurationInS()); err != nil && e != nil {
priced.SetLocationEnd(*e)
}
resources = append(resources, realItem.(T))
if priceds[dt][item.ID] != nil {
priced.AddQuantity(priceds[dt][item.ID].GetQuantity())

View File

@@ -43,8 +43,8 @@ type WorkflowExecution struct {
}
func (r *WorkflowExecution) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY
r.State = enum.SCHEDULED
r.IsDraft = true
r.State = enum.DRAFT
}
func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
@@ -65,7 +65,7 @@ func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
if ws.EndDate == nil {
// if no end... then Book like a savage
e := ws.ExecDate.Add(time.Hour)
e := ws.ExecDate.UTC().Add(time.Hour)
ws.EndDate = &e
}
accessor := ws.GetAccessor(request)
@@ -132,7 +132,6 @@ func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string,
purchases = append(purchases, d.buyEach(bs, executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
d.PurchasesState = map[string]bool{}
for _, p := range purchases {
p.SetID(uuid.NewString())
d.PurchasesState[p.GetID()] = false
}
return purchases
@@ -154,7 +153,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
d.PeerBuyByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate
if s := priced.GetLocationStart(); s != nil {
if s := priced.GetLocationStart(); s != nil && s.After(time.Now()) {
start = *s
}
var m map[string]interface{}
@@ -163,8 +162,9 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
bookingItem := &purchase_resource.PurchaseResource{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
IsDraft: true,
},
PricedItem: m,
ExecutionID: d.GetID(),
@@ -188,7 +188,9 @@ func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[t
booking = append(booking, d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
booking = append(booking, d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
for _, p := range booking {
p.SetID(uuid.NewString())
if d.BookingsState == nil {
d.BookingsState = map[string]bool{}
}
d.BookingsState[p.GetID()] = false
}
return booking
@@ -210,21 +212,32 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate
if s := priced.GetLocationStart(); s != nil {
if s := priced.GetLocationStart(); s != nil && s.After(time.Now()) {
start = *s
}
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
// Prefer LocationEnd set by Planify; fall back to ExplicitDurationInS only
// when Planify did not compute an end (open-ended / service resources).
var endDate *time.Time
if locEnd := priced.GetLocationEnd(); locEnd != nil {
endDate = locEnd
} else if durationS := priced.GetExplicitDurationInS(); durationS > 0 {
e := start.Add(time.Duration(durationS) * time.Second)
endDate = &e
}
// durationS < 0 means the resource is a service (runs indefinitely):
// leave ExpectedEndDate nil so the booking is open-ended.
var m map[string]interface{}
b, _ := json.Marshal(priced)
json.Unmarshal(b, &m)
bookingItem := &booking.Booking{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
IsDraft: true,
},
PricedItem: m,
ExecutionsID: executionsID,
State: enum.SCHEDULED,
State: enum.DRAFT,
ResourceID: priced.GetID(),
InstanceID: priced.GetInstanceID(),
ResourceType: dt,
@@ -232,7 +245,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
WorkflowID: wfID,
ExecutionID: d.GetID(),
ExpectedStartDate: start,
ExpectedEndDate: &end,
ExpectedEndDate: endDate,
}
items = append(items, bookingItem)
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(

View File

@@ -24,7 +24,7 @@ func newShallowAccessor(request *tools.APIRequest) *WorkflowExecutionMongoAccess
Request: request,
Type: tools.WORKFLOW_EXECUTION,
New: func() *WorkflowExecution { return &WorkflowExecution{} },
NotImplemented: []string{"DeleteOne", "StoreOne", "CopyOne"},
NotImplemented: []string{"CopyOne"},
},
}
}
@@ -37,7 +37,7 @@ func NewAccessor(request *tools.APIRequest) *WorkflowExecutionMongoAccessor {
Request: request,
Type: tools.WORKFLOW_EXECUTION,
New: func() *WorkflowExecution { return &WorkflowExecution{} },
NotImplemented: []string{"DeleteOne", "StoreOne", "CopyOne"},
NotImplemented: []string{"CopyOne"},
},
}
}
@@ -52,7 +52,7 @@ func (wfa *WorkflowExecutionMongoAccessor) UpdateOne(set map[string]interface{},
}
func (a *WorkflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*WorkflowExecution](id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
now := time.Now()
now = now.Add(time.Second * -60)
if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {

View File

@@ -13,7 +13,6 @@ import (
func LoadKeyFromFilePrivate() (crypto.PrivKey, error) {
path := config.GetConfig().PrivateKeyPath
fmt.Println(path)
data, err := os.ReadFile(path)
if err != nil {
return nil, err

View File

@@ -31,6 +31,7 @@ const (
LIVE_STORAGE
BILL
NATIVE_TOOL
EXECUTION_VERIFICATION
)
var NOAPI = func() string {
@@ -54,6 +55,11 @@ var PEERSAPI = func() string {
var DATACENTERAPI = func() string {
return config.GetConfig().InternalDatacenterAPI
}
var SCHEDULERAPI = func() string {
return config.GetConfig().InternalSchedulerAPI
}
var PURCHASEAPI = func() string {
return config.GetConfig().InternalCatalogAPI + "/purchase"
}
@@ -72,7 +78,7 @@ var InnerDefaultAPI = [...]func() string{
PEERSAPI,
SHAREDAPI,
SHAREDAPI,
DATACENTERAPI,
SCHEDULERAPI,
NOAPI,
NOAPI,
NOAPI,
@@ -81,6 +87,7 @@ var InnerDefaultAPI = [...]func() string{
DATACENTERAPI,
NOAPI,
CATALOGAPI,
SCHEDULERAPI,
}
// Bind the standard data name to the data type
@@ -106,6 +113,7 @@ var Str = [...]string{
"live_storage",
"bill",
"native_tool",
"execution_verification",
}
func FromString(comp string) int {
@@ -163,6 +171,7 @@ const (
PB_CONSIDERS
PB_ADMIRALTY_CONFIG
PB_MINIO_CONFIG
PB_CLOSE_SEARCH
NONE
)
@@ -188,12 +197,15 @@ func GetActionString(ss string) PubSubAction {
return PB_ADMIRALTY_CONFIG
case "minio_config":
return PB_MINIO_CONFIG
case "close_search":
return PB_CLOSE_SEARCH
default:
return NONE
}
}
var path = []string{"search", "search_response", "create", "update", "delete", "planner", "close_planner", "considers", "admiralty_config", "minio_config"}
var path = []string{"search", "search_response", "create", "update", "delete", "planner", "close_planner",
"considers", "admiralty_config", "minio_config", "close_search"}
func (m PubSubAction) String() string {
return strings.ToUpper(path[m])

View File

@@ -35,12 +35,16 @@ type KubernetesService struct {
}
func NewDynamicClient(host string, ca string, cert string, data string) (*dynamic.DynamicClient, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(data)
config := &rest.Config{
Host: host,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(ca),
CertData: []byte(cert),
KeyData: []byte(data),
CAData: []byte(decodedCa),
CertData: []byte(decodedCert),
KeyData: []byte(decodedKey),
},
}
@@ -56,18 +60,21 @@ func NewDynamicClient(host string, ca string, cert string, data string) (*dynami
}
func NewKubernetesService(host string, ca string, cert string, data string) (*KubernetesService, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(data)
config := &rest.Config{
Host: host,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(ca),
CertData: []byte(cert),
KeyData: []byte(data),
CAData: []byte(decodedCa),
CertData: []byte(decodedCert),
KeyData: []byte(decodedKey),
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
@@ -84,38 +91,6 @@ func NewKubernetesService(host string, ca string, cert string, data string) (*Ku
}, nil
}
func NewRemoteKubernetesService(url string, ca string, cert string, key string) (*KubernetesService, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(key)
config := &rest.Config{
Host: url + ":6443",
TLSClientConfig: rest.TLSClientConfig{
CAData: decodedCa,
CertData: decodedCert,
KeyData: decodedKey,
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
if clientset == nil {
return nil, errors.New("Error creating Kubernetes client: clientset is nil")
}
return &KubernetesService{
Set: clientset,
Host: url,
CA: string(decodedCa),
Cert: string(decodedCert),
Data: string(decodedKey),
}, nil
}
func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) error {
// Define the namespace
fmt.Println("ExecutionID in CreateNamespace() : ", ns)
@@ -128,7 +103,7 @@ func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) erro
},
}
// Create the namespace
fmt.Println("Creating namespace...", k.Set)
fmt.Println("Creating namespace...")
if _, err := k.Set.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{}); err != nil {
return errors.New("Error creating namespace: " + err.Error())
}
@@ -208,6 +183,40 @@ func (k *KubernetesService) CreateRoleBinding(ctx context.Context, ns string, ro
return nil
}
// ProvisionExecutionNamespace creates the full Argo execution environment for a
// namespace: namespace, service-account, role and role-binding. Idempotent — if
// the namespace already exists the call is a no-op.
func (k *KubernetesService) ProvisionExecutionNamespace(ctx context.Context, ns string) error {
existing, _ := k.GetNamespace(ctx, ns)
if existing != nil {
return nil
}
if err := k.CreateNamespace(ctx, ns); err != nil && !strings.Contains(err.Error(), "already exists") {
return err
}
if err := k.CreateServiceAccount(ctx, ns); err != nil && !strings.Contains(err.Error(), "already exists") {
return err
}
role := "argo-role"
if err := k.CreateRole(ctx, ns, role,
[][]string{{"coordination.k8s.io"}, {""}, {""}},
[][]string{{"leases"}, {"secrets"}, {"pods"}},
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}},
); err != nil {
return err
}
return k.CreateRoleBinding(ctx, ns, "argo-role-binding", role)
}
// TeardownExecutionNamespace deletes the namespace and lets Kubernetes cascade
// the deletion of all contained resources (SA, Role, RoleBinding, pods…).
func (k *KubernetesService) TeardownExecutionNamespace(ctx context.Context, ns string) error {
if err := k.Set.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{}); err != nil {
return errors.New("error deleting namespace " + ns + ": " + err.Error())
}
return nil
}
func (k *KubernetesService) DeleteNamespace(ctx context.Context, ns string, f func()) error {
targetGVR := schema.GroupVersionResource{
Group: "multicluster.admiralty.io",
@@ -270,17 +279,12 @@ func (k *KubernetesService) GetTargets(ctx context.Context) ([]string, error) {
return nil, err
}
fmt.Println(string(resp))
var targetDict map[string]interface{}
err = json.Unmarshal(resp, &targetDict)
if err != nil {
fmt.Println("TODO: handle the error when unmarshalling k8s API response")
return nil, err
}
b, _ := json.MarshalIndent(targetDict, "", " ")
fmt.Println(string(b))
data := targetDict["items"].([]interface{})
for _, item := range data {
@@ -390,7 +394,6 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
// config, err := base64.RawStdEncoding.DecodeString(kubeconfig)
if err != nil {
fmt.Println("Error while encoding kubeconfig")
fmt.Println(err)
return nil, err
}
@@ -400,21 +403,6 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
"config": config,
},
)
// exists, err := k.GetKubeconfigSecret(context,executionId)
// if err != nil {
// fmt.Println("Error verifying if kube secret exists in namespace ", executionId)
// return nil, err
// }
// if exists != nil {
// fmt.Println("kube-secret already exists in namespace", executionId)
// fmt.Println("Overriding existing kube-secret with a newer resource")
// // TODO : implement DeleteKubeConfigSecret(executionID)
// deleted, err := k.DeleteKubeConfigSecret(executionId)
// _ = deleted
// _ = err
// }
resp, err := k.Set.CoreV1().
Secrets(executionId).
Apply(context,
@@ -425,14 +413,12 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
if err != nil {
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err)
return nil, err
}
data, err := json.Marshal(resp)
if err != nil {
fmt.Println("Couldn't marshal resp from : ", data)
fmt.Println(err)
return nil, err
}
return data, nil
@@ -449,7 +435,6 @@ func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executi
return nil, nil
}
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err)
return nil, err
}
@@ -457,7 +442,6 @@ func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executi
if err != nil {
fmt.Println("Couldn't marshal resp from : ", data)
fmt.Println(err)
return nil, err
}
@@ -512,15 +496,14 @@ func dynamicClientApply(host string, ca string, cert string, data string, execut
},
)
if err != nil {
o, err := json.Marshal(object)
fmt.Println("Error from k8s API when applying "+fmt.Sprint(string(o))+" to "+gvrSources.String()+" : ", err)
fmt.Println("Error from k8s API when applying "+fmt.Sprintf("%v", object)+" to "+gvrSources.String()+" : ", err)
return nil, err
}
// We can add more info to the log with the content of resp if not nil
resByte, err := json.Marshal(res)
if err != nil {
// fmt.Println("Error trying to create a Source on remote cluster : ", err , " : ", res)
fmt.Println("Error trying to create a Source on remote cluster : ", err, " : ", res)
return nil, err
}
@@ -578,7 +561,6 @@ func (k *KubernetesService) GetOneNode(context context.Context, executionID stri
)
if err != nil {
fmt.Println("Error getting the list of nodes from k8s API")
fmt.Println(err)
return nil, err
}

View File

@@ -28,7 +28,9 @@ type NATSMethod int
var meths = []string{"remove execution", "create execution", "planner execution", "discovery",
"workflow event", "argo kube event", "create resource", "remove resource",
"propalgation event", "search event",
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event",
"workflow started event", "workflow step done event", "workflow done event",
}
const (
@@ -45,6 +47,19 @@ const (
PROPALGATION_EVENT
SEARCH_EVENT
CONFIRM_EVENT
CONSIDERS_EVENT
ADMIRALTY_CONFIG_EVENT
MINIO_CONFIG_EVENT
// Workflow lifecycle events emitted by oc-monitord.
// oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state.
// oc-datacenter listens to STEP_DONE and DONE to close bookings and tear down infra.
WORKFLOW_STARTED_EVENT
WORKFLOW_STEP_DONE_EVENT
WORKFLOW_DONE_EVENT
)
func (n NATSMethod) String() string {
@@ -54,7 +69,9 @@ func (n NATSMethod) String() string {
// NameToMethod returns the NATSMethod enum value from a string
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT} {
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}

View File

@@ -0,0 +1,33 @@
package tools
import "time"
// StepMetric carries the outcome of one Argo step node as observed by oc-monitord.
// Embedded in WorkflowLifecycleEvent.Steps for the WORKFLOW_DONE_EVENT recap.
type StepMetric struct {
BookingID string `json:"booking_id"`
State int `json:"state"`
RealStart *time.Time `json:"real_start,omitempty"`
RealEnd *time.Time `json:"real_end,omitempty"`
}
// WorkflowLifecycleEvent is the NATS payload emitted by oc-monitord on
// WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, and WORKFLOW_DONE_EVENT.
//
// - ExecutionID : WorkflowExecution UUID (used by oc-scheduler to update state)
// - ExecutionsID : run-group ID shared by all bookings of the same run
// - BookingID : non-empty only for WORKFLOW_STEP_DONE_EVENT
// - State : target state (enum index: SUCCESS=3, FAILURE=4, STARTED=2, …)
// - RealStart : actual start timestamp recorded by Argo (nil if unknown)
// - RealEnd : actual end timestamp recorded by Argo (nil for STARTED events)
// - Steps : non-nil only for WORKFLOW_DONE_EVENT — full recap of every step
// so oc-scheduler and oc-catalog can catch up if they missed STEP_DONE events
type WorkflowLifecycleEvent struct {
ExecutionID string `json:"execution_id"`
ExecutionsID string `json:"executions_id"`
BookingID string `json:"booking_id,omitempty"`
State int `json:"state"`
RealStart *time.Time `json:"real_start,omitempty"`
RealEnd *time.Time `json:"real_end,omitempty"`
Steps []StepMetric `json:"steps,omitempty"`
}