67 Commits

Author SHA1 Message Date
mr
88d2e52628 Correct 2026-03-20 16:14:07 +01:00
mr
9f861e5b8d Set up 2026-03-20 15:41:33 +01:00
mr
e4506f3b42 longest trace 2026-03-20 15:21:48 +01:00
mr
75d08aae7c time longest 2026-03-20 15:09:52 +01:00
mr
b288085f32 if 100% kick 2026-03-20 14:57:01 +01:00
mr
bd3e81be0c CHECK log 2026-03-20 14:51:08 +01:00
mr
fafa1186c2 out * 1 hour 2026-03-20 14:42:48 +01:00
mr
471eaff94c missing instanceID 2026-03-20 14:38:52 +01:00
mr
c9fcabac6e debug time 2026-03-20 14:32:46 +01:00
mr
478e68e6d4 Workout Time Scheduling 2026-03-20 14:20:26 +01:00
mr
5619010838 correct time loc 2026-03-20 14:01:14 +01:00
mr
f1a9214ac7 Check trigger strange 2026-03-20 13:41:12 +01:00
mr
e6eb516f39 ensurePricing 2026-03-20 13:28:35 +01:00
mr
1508cc3611 PricedItem evolved 2026-03-20 13:07:06 +01:00
mr
2abc035ec0 planner trace 2026-03-20 12:09:28 +01:00
mr
c34b8c6703 correction planner 2026-03-20 11:33:59 +01:00
mr
a62fbc6c7a Workflow lifecycle events + resource instance duration tracking
- Add WorkflowLifecycleEvent + StepMetric to tools/workflow_lifecycle.go
- Add WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT NATS methods
- ResourceInstance.UpdateAverageDuration for AverageDurationS running average
- Support Steps recap in WORKFLOW_DONE_EVENT for catch-up by oc-scheduler/oc-catalog

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 10:30:30 +01:00
mr
6e28dce02c provisionning 2026-03-19 15:52:55 +01:00
mr
fe3b185b60 err trace 2026-03-19 12:05:33 +01:00
mr
6641d38d9d DBAbstract 2026-03-19 11:32:51 +01:00
mr
93ad8db9a8 decoded CA 2026-03-19 11:17:14 +01:00
mr
4eb53917b8 Log 2026-03-19 10:50:00 +01:00
mr
c7884f5cde NewKubernetesService decoded 2026-03-19 09:05:42 +01:00
mr
5fca0480af suppress check error on get 2026-03-19 08:44:25 +01:00
mr
28b5b7d39f Provisionning Ns + TearDown Ns 2026-03-19 08:18:18 +01:00
mr
5b7edb53a9 OcLib 2026-03-19 07:56:47 +01:00
mr
5976795d44 New Channel to Clarify Movement 2026-03-18 15:38:22 +01:00
mr
3d22ff40fb PB -> ADMIRALTY + MINIO 2026-03-18 14:58:21 +01:00
mr
889656a95e argo kube event remains 2026-03-18 14:52:07 +01:00
mr
c66fbc809e argo event 2026-03-18 14:46:35 +01:00
mr
1a37a1b4aa loki adjust 2026-03-18 10:28:31 +01:00
mr
d4ac398cdb plantuml debug 2026-03-18 09:41:09 +01:00
mr
4eb112bee3 Debug 2026-03-18 09:17:22 +01:00
mr
d1214fe622 adjust Export 2026-03-18 09:10:58 +01:00
mr
6a907236fa export 2026-03-18 08:40:39 +01:00
mr
85314baac3 PlantUML doc & Human Readable commentary 2026-03-18 08:30:02 +01:00
mr
cec8033ddc by pass restriction 2026-03-17 16:46:40 +01:00
mr
d0645f5ca7 publishing is only allowed is it can be monitored and be accessible temp disable 2026-03-17 16:42:03 +01:00
mr
c39bc52312 setup draft as live 2026-03-17 16:35:35 +01:00
mr
0a87343e3e Copy 2026-03-17 16:24:42 +01:00
mr
96beaade24 access 2026-03-17 16:15:12 +01:00
mr
5753450965 oclib setup 2026-03-17 16:09:39 +01:00
mr
7f8d697e4c \n replaceAll 2026-03-17 15:49:27 +01:00
mr
94837f8d24 kicks out Required not Required 2026-03-17 15:31:25 +01:00
mr
e758144b46 forgot 2026-03-17 15:25:54 +01:00
mr
72be3118b7 NATSMethod 2026-03-17 14:59:27 +01:00
mr
67778e1e47 err 2026-03-17 14:54:13 +01:00
mr
562dfb18c1 graphItem 2026-03-17 14:37:06 +01:00
mr
2a2dd96870 graphVarName 2026-03-17 14:32:39 +01:00
mr
333476e2c5 Setup 2026-03-17 14:26:11 +01:00
mr
0fd2513278 Setup 2026-03-17 14:17:49 +01:00
mr
e79101f58d oc-lib 2026-03-17 14:03:19 +01:00
mr
b3dbc7687e setup 2026-03-17 13:52:43 +01:00
mr
8fd4f5faef items 2026-03-17 13:36:36 +01:00
mr
f7012e285f setup 2026-03-17 13:29:54 +01:00
mr
088b45b2cf Set up 2026-03-17 13:19:51 +01:00
mr
1ac735cef1 Stop rebuild id 2026-03-17 10:04:40 +01:00
mr
65237f0d1f implement 2026-03-17 09:32:02 +01:00
mr
9b2f945176 forgot about sessionID use ExecutionsID 2026-03-17 09:01:47 +01:00
mr
b110cbc260 error on usage start 2026-03-16 15:59:19 +01:00
mr
a4d81cbb67 sec 2026-03-16 13:16:50 +01:00
mr
9bf2c566e9 test 2026-03-16 12:48:21 +01:00
mr
6d8efd137a After 2026-03-16 12:32:39 +01:00
mr
40a986af41 order per session 2026-03-16 11:47:51 +01:00
mr
4a076ba237 SchedulingSessionID 2026-03-16 11:45:58 +01:00
mr
deb819c5af After check 2026-03-16 11:41:05 +01:00
mr
55a039bd66 follow date 2026-03-16 11:40:00 +01:00
43 changed files with 1317 additions and 414 deletions

View File

@@ -0,0 +1,214 @@
# PlantUML Format de commentaire human-readable
Ce document décrit la syntaxe des commentaires attachés aux ressources et aux liens
dans les fichiers PlantUML importés par OpenCloud.
---
## Syntaxe générale
```plantuml
TypeRessource(varName, "Nom affiché") ' clé: valeur, clé.sous_clé: valeur
```
### Règles de parsing
| Règle | Détail |
|---|---|
| Séparateur de paires | `,` |
| Séparateur clé/valeur | premier `:` de la paire (les URLs `http://...` sont gérées) |
| Sous-objets | notation pointée `access.container.image: nginx` |
| Types | auto-inférés : `bool` > `float64` > `string` |
| Fallback | JSON brut si le commentaire commence par `{` (compatibilité ascendante) |
### Comportement à l'import
Chaque ressource reçoit automatiquement une **instance par défaut**, seedée avec les
attributs de la ressource parente. Le commentaire vient ensuite **surcharger** uniquement
les champs explicitement renseignés.
> **Exception :** `WorkflowEvent` n'a pas d'instance (voir section dédiée).
---
## Ressources disponibles
### `Data(var, "nom")` Données
Ressource de données. Les attributs qualifient le modèle de données **et** son instance
(source d'accès).
| Clé | Type | Description |
|---|---|---|
| `type` | string | Type de données (`raster`, `vector`, `tabular`) |
| `quality` | string | Niveau de qualité |
| `open_data` | bool | Données en accès libre |
| `static` | bool | Données statiques (pas de mise à jour) |
| `personal_data` | bool | Contient des données personnelles |
| `anonymized_personal_data` | bool | Données personnelles anonymisées |
| `size` | float64 | Taille en GB |
| `access_protocol` | string | Protocole d'accès (`http`, `s3`, `ftp`) |
| `country` | string | Code pays ISO (`FR`, `DE`) |
| `location.latitude` | float64 | Latitude géographique |
| `location.longitude` | float64 | Longitude géographique |
| `source` | string | URL / endpoint d'accès à la donnée |
```plantuml
Data(d1, "Satellites L2A") ' type: raster, open_data: true, size: 120.5, source: https://catalogue.example.com, country: FR
```
---
### `Processing(var, "nom")` Traitement
Ressource de traitement (algorithme, conteneur, service). Les attributs qualifient
le modèle de traitement **et** sa configuration d'exécution.
| Clé | Type | Description |
|---|---|---|
| `infrastructure` | int | Infrastructure cible : `0`=DOCKER, `1`=KUBERNETES, `2`=SLURM, `3`=HW, `4`=CONDOR |
| `is_service` | bool | Traitement persistant (service long-running) |
| `open_source` | bool | Code source ouvert |
| `license` | string | Licence (`MIT`, `Apache-2.0`, `GPL-3.0`) |
| `maturity` | string | Maturité (`prototype`, `beta`, `production`) |
| `access_protocol` | string | Protocole d'accès |
| `country` | string | Code pays ISO |
| `location.latitude` | float64 | Latitude |
| `location.longitude` | float64 | Longitude |
| `access.container.image` | string | Image du conteneur |
| `access.container.command` | string | Commande de démarrage |
| `access.container.args` | string | Arguments de la commande |
```plantuml
Processing(p1, "NDVI Calc") ' infrastructure: 0, open_source: true, license: MIT, maturity: production, access.container.image: myrepo/ndvi:1.2
```
---
### `Storage(var, "nom")` Stockage
Ressource de stockage. Produit une instance live (`LiveStorage`) à l'import.
| Clé | Type | Description |
|---|---|---|
| `storage_type` | int | Type de stockage (enum) |
| `source` | string | URL / endpoint du stockage |
| `path` | string | Chemin ou bucket dans le stockage |
| `local` | bool | Stockage local |
| `security_level` | string | Niveau de sécurité |
| `size` | float64 | Taille allouée en GB |
| `encryption` | bool | Chiffrement activé |
| `redundancy` | string | Politique de redondance |
| `throughput` | string | Débit cible |
| `access_protocol` | string | Protocole (`s3`, `nfs`, `smb`) |
| `country` | string | Code pays ISO |
| `location.latitude` | float64 | Latitude |
| `location.longitude` | float64 | Longitude |
```plantuml
Storage(s1, "Minio OVH") ' source: http://minio.example.com:9000, path: /bucket/data, access_protocol: s3, encryption: true, size: 500, country: FR
```
---
### `ComputeUnit(var, "nom")` Unité de calcul
Ressource de calcul (datacenter, cluster). Produit une instance live (`LiveDatacenter`)
à l'import.
| Clé | Type | Description |
|---|---|---|
| `architecture` | string | Architecture CPU (`x86_64`, `arm64`) |
| `infrastructure` | int | `0`=DOCKER, `1`=KUBERNETES, `2`=SLURM, `3`=HW, `4`=CONDOR |
| `source` | string | URL de l'API du datacenter |
| `security_level` | string | Niveau de sécurité |
| `annual_co2_emissions` | float64 | Émissions CO annuelles (kg) |
| `access_protocol` | string | Protocole d'accès |
| `country` | string | Code pays ISO |
| `location.latitude` | float64 | Latitude |
| `location.longitude` | float64 | Longitude |
```plantuml
ComputeUnit(c1, "Datacenter Rennes") ' source: https://api.dc-rennes.example.com, infrastructure: 1, country: FR, location.latitude: 48.11, location.longitude: -1.68, security_level: high
```
---
### `WorkflowEvent(var, "nom")` Événement déclencheur de workflow
Crée directement un `NativeTool` de type `WORKFLOW_EVENT` (Kind = 0).
Représente le point de départ d'un workflow.
> **Pas d'instance. Pas de commentaire.**
> Le nom du `NativeTool` est forcé à `WORKFLOW_EVENT` à l'import.
```plantuml
WorkflowEvent(e1, "Start")
```
---
## Liens
Les commentaires sur les liens qualifient la connexion entre deux ressources
(typiquement entre un traitement et un stockage).
### Syntaxe
```plantuml
source --> destination ' clé: valeur
source <-- destination ' clé: valeur
source -- destination ' clé: valeur (non directionnel)
```
### Attributs disponibles
| Clé | Type | Description |
|---|---|---|
| `storage_link_infos.write` | bool | `true` = écriture, `false` = lecture |
| `storage_link_infos.source` | string | Chemin source dans le lien |
| `storage_link_infos.destination` | string | Chemin destination dans le lien |
| `storage_link_infos.filename` | string | Nom du fichier échangé |
```plantuml
p1 --> s1 ' storage_link_infos.write: true, storage_link_infos.filename: output.tif
d1 --> p1
```
---
## Exemple complet
```plantuml
@startuml
!include opencloud.puml
WorkflowEvent(e1, "Start")
Data(d1, "Satellites L2A") ' type: raster, open_data: true, size: 120.5, source: https://catalogue.example.com, country: FR
Processing(p1, "NDVI") ' infrastructure: 0, open_source: true, license: MIT, access.container.image: myrepo/ndvi:1.2
Storage(s1, "Minio résultats") ' source: http://minio.example.com:9000, path: /results, access_protocol: s3, encryption: true, size: 500, country: FR
ComputeUnit(c1, "DC Rennes") ' source: https://api.dc.example.com, infrastructure: 1, country: FR, location.latitude: 48.11, location.longitude: -1.68
e1 --> p1
d1 --> p1
p1 --> s1 ' storage_link_infos.write: true, storage_link_infos.filename: ndvi.tif
s1 --> c1
@enduml
```
---
## Récapitulatif des types de ressources
| Mot-clé PlantUML | Type Go | Instance | Live | Commentaire |
|---|---|---|---|---|
| `Data` | `DataResource` | `DataInstance` | non | oui |
| `Processing` | `ProcessingResource` | `ProcessingInstance` | non | oui |
| `Storage` | `StorageResource` | `StorageResourceInstance` | oui `LiveStorage` | oui |
| `ComputeUnit` | `ComputeResource` | `ComputeResourceInstance` | oui `LiveDatacenter` | oui |
| `WorkflowEvent` | `NativeTool` (Kind=WORKFLOW_EVENT) | aucune | non | non |

View File

@@ -58,14 +58,13 @@ func (w *LokiWriter) Write(p []byte) (n int, err error) {
// Add label that have been added to the event
// A bit unsafe since we don't know what could be stored in the event
// but we can't access this object once passed to the multilevel writter
for k,v := range(event){
if k != "level" && k != "time" && k != "message"{
labels[k] = v.(string)
for k, v := range event {
if k != "level" && k != "time" && k != "message" {
labels[k] = fmt.Sprintf("%v", v)
}
}
// Format the timestamp in nanoseconds
timestamp := fmt.Sprintf("%d000000", time.Now().UnixNano()/int64(time.Millisecond))
@@ -87,7 +86,7 @@ func (w *LokiWriter) Write(p []byte) (n int, err error) {
//fmt.Printf("Sending payload to Loki: %s\n", string(payloadBytes))
req, err := http.NewRequest("POST", w.url + "/loki/api/v1/push", bytes.NewReader(payloadBytes))
req, err := http.NewRequest("POST", w.url+"/loki/api/v1/push", bytes.NewReader(payloadBytes))
if err != nil {
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}

View File

@@ -2,12 +2,12 @@ package bill
import (
"encoding/json"
"fmt"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
@@ -49,9 +49,10 @@ func DraftFirstBill(order *order.Order, request *tools.APIRequest) (*Bill, error
peers[p.DestPeerID] = []*PeerItemOrder{}
}
peers[p.DestPeerID] = append(peers[p.DestPeerID], &PeerItemOrder{
Purchase: p,
Item: p.PricedItem,
Quantity: 1,
ResourceType: p.ResourceType,
Purchase: p,
Item: p.PricedItem,
Quantity: 1,
})
}
for _, b := range order.Bookings {
@@ -70,7 +71,9 @@ func DraftFirstBill(order *order.Order, request *tools.APIRequest) (*Bill, error
peers[b.DestPeerID] = []*PeerItemOrder{}
}
peers[b.DestPeerID] = append(peers[b.DestPeerID], &PeerItemOrder{
Item: b.PricedItem,
ResourceType: b.ResourceType,
Quantity: 1,
Item: b.PricedItem,
})
}
peerOrders := map[string]*PeerOrder{}
@@ -136,6 +139,22 @@ type PeerOrder struct {
Total float64 `json:"total,omitempty" bson:"total,omitempty"`
}
func PricedByType(dt tools.DataType) pricing.PricedItemITF {
switch dt {
case tools.PROCESSING_RESOURCE:
return &resources.PricedProcessingResource{}
case tools.STORAGE_RESOURCE:
return &resources.PricedStorageResource{}
case tools.DATA_RESOURCE:
return &resources.PricedDataResource{}
case tools.COMPUTE_RESOURCE:
return &resources.PricedComputeResource{}
case tools.WORKFLOW_RESOURCE:
return &resources.PricedResource[*pricing.ExploitPricingProfile[pricing.TimePricingStrategy]]{}
}
return nil
}
func (d *PeerOrder) Pay(request *tools.APIRequest, response chan *PeerOrder, wg *sync.WaitGroup) {
d.Status = enum.PENDING
@@ -145,7 +164,7 @@ func (d *PeerOrder) Pay(request *tools.APIRequest, response chan *PeerOrder, wg
d.Status = enum.PAID // TO REMOVE LATER IT'S A MOCK
if d.Status == enum.PAID {
for _, b := range d.Items {
var priced *resources.PricedResource
priced := PricedByType(b.ResourceType)
bb, _ := json.Marshal(b.Item)
json.Unmarshal(bb, priced)
if !priced.IsPurchasable() {
@@ -179,9 +198,10 @@ func (d *PeerOrder) SumUpBill(request *tools.APIRequest) error {
}
type PeerItemOrder struct {
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
Purchase *purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"`
Item map[string]interface{} `json:"item,omitempty" bson:"item,omitempty"`
ResourceType tools.DataType `json:"datatype,omitempty" bson:"datatype,omitempty"`
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
Purchase *purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"`
Item map[string]interface{} `json:"item,omitempty" bson:"item,omitempty"`
}
func (d *PeerItemOrder) GetPriceHT(request *tools.APIRequest) (float64, error) {
@@ -190,11 +210,10 @@ func (d *PeerItemOrder) GetPriceHT(request *tools.APIRequest) (float64, error) {
return 0, nil
}
///////////
var priced *resources.PricedResource
priced := PricedByType(d.ResourceType)
b, _ := json.Marshal(d.Item)
err := json.Unmarshal(b, priced)
if err != nil {
fmt.Println(err)
return 0, err
}
accessor := purchase_resource.NewAccessor(request)

View File

@@ -3,12 +3,10 @@ package booking
import (
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
)
/*
@@ -38,8 +36,7 @@ type Booking struct {
// Authorization: identifies who created this draft and the Check session it belongs to.
// Used to verify UPDATE and DELETE orders from remote schedulers.
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
SchedulingSessionID string `json:"scheduling_session_id,omitempty" bson:"scheduling_session_id,omitempty"`
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
}
func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResume {
@@ -69,40 +66,15 @@ func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResu
return m
}
// CheckBooking checks if a booking is possible on a specific compute resource
func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelAllowed int) (bool, error) {
// check if
if end == nil {
// if no end... then Book like a savage
e := start.Add(time.Hour)
end = &e
}
accessor := NewAccessor(nil)
res, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date
"resource_id": {{Operator: dbs.EQUAL.String(), Value: id}},
"state": {{Operator: dbs.EQUAL.String(), Value: enum.DRAFT.EnumIndex()}},
"expected_start_date": {
{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*end)},
{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(start)},
},
},
}, "", wfa.IsDraft)
if code != 200 {
return false, err
}
return len(res) <= parrallelAllowed, nil
}
func (d *Booking) GetDelayForLaunch() time.Duration {
return d.RealStartDate.Sub(d.ExpectedStartDate)
}
func (d *Booking) GetDelayForFinishing() time.Duration {
if d.ExpectedEndDate == nil {
if d.ExpectedEndDate == nil || d.RealEndDate == nil {
return time.Duration(0)
}
return d.RealEndDate.Sub(d.ExpectedStartDate)
return d.RealEndDate.Sub(*d.ExpectedEndDate)
}
func (d *Booking) GetUsualDuration() time.Duration {
@@ -134,14 +106,20 @@ func (r *Booking) StoreDraftDefault() {
}
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if !r.IsDraft && r.State != set.(*Booking).State || r.RealStartDate != set.(*Booking).RealStartDate || r.RealEndDate != set.(*Booking).RealEndDate {
return true, &Booking{
State: set.(*Booking).State,
RealStartDate: set.(*Booking).RealStartDate,
RealEndDate: set.(*Booking).RealEndDate,
} // only state can be updated
incoming := set.(*Booking)
if !r.IsDraft && r.State != incoming.State || r.RealStartDate != incoming.RealStartDate || r.RealEndDate != incoming.RealEndDate {
patch := &Booking{
State: incoming.State,
RealStartDate: incoming.RealStartDate,
RealEndDate: incoming.RealEndDate,
}
// Auto-set RealStartDate when transitioning to STARTED and not already set
if r.State != enum.STARTED && incoming.State == enum.STARTED && patch.RealStartDate == nil {
now := time.Now()
patch.RealStartDate = &now
}
return true, patch
}
// TODO : HERE WE CAN HANDLE THE CASE WHERE THE BOOKING IS DELAYED OR EXCEEDING OR ending sooner
return r.IsDraft, set
}

View File

@@ -2,9 +2,12 @@ package planner
import (
"encoding/json"
"sort"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -37,6 +40,12 @@ type PlannerSlot struct {
Usage map[string]float64 `json:"usage,omitempty"` // dimension -> % of max (0-100)
}
// PlannerITF is the interface used by Planify to check resource availability.
// *Planner satisfies this interface.
type PlannerITF interface {
NextAvailableStart(resourceID, instanceID string, start time.Time, d time.Duration) time.Time
}
// Planner is a volatile (non-persisted) object that organises bookings by resource.
// Only ComputeResource and StorageResource bookings appear in the schedule.
type Planner struct {
@@ -61,11 +70,19 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
// Include both confirmed (IsDraft=false) and draft (IsDraft=true) bookings
// so the planner reflects the full picture: first-come first-served on all
// pending reservations regardless of confirmation state.
confirmed, code, err := accessor.Search(nil, "*", false)
confirmed, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{Operator: dbs.GTE.String(), Value: time.Now().UTC()}},
},
}, "*", false)
if code != 200 || err != nil {
return nil, err
}
drafts, _, _ := accessor.Search(nil, "*", true)
drafts, _, _ := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{Operator: dbs.GTE.String(), Value: time.Now().UTC()}},
},
}, "*", true)
bookings := append(confirmed, drafts...)
p := &Planner{
@@ -77,6 +94,12 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
for _, b := range bookings {
bk := b.(*booking.Booking)
// Skip terminal bookings — they no longer occupy capacity.
switch bk.State {
case enum.SUCCESS, enum.FAILURE, enum.FORGOTTEN, enum.CANCELLED:
continue
}
// Only compute and storage resources are eligible
if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE {
continue
@@ -84,11 +107,14 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
end := bk.ExpectedEndDate
if end == nil {
e := bk.ExpectedStartDate.Add(time.Hour)
e := bk.ExpectedStartDate.UTC().Add(5 * time.Minute)
end = &e
}
instanceID, usage, cap := extractSlotData(bk, request)
if instanceID == "" {
instanceID = bk.InstanceID
}
if cap != nil && instanceID != "" {
if p.Capacities[bk.ResourceID] == nil {
@@ -126,7 +152,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
// If no capacity is known for this instance (never booked), it is fully available.
func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool {
if end == nil {
e := start.Add(time.Hour)
e := start.Add(5 * time.Minute)
end = &e
}
@@ -137,7 +163,6 @@ func (p *Planner) Check(resourceID string, instanceID string, req *ResourceReque
if !ok {
return true
}
for _, slot := range slots {
// Only consider slots on the same instance
if slot.InstanceID != instanceID {
@@ -147,9 +172,13 @@ func (p *Planner) Check(resourceID string, instanceID string, req *ResourceReque
if !slot.Start.Before(*end) || !slot.End.After(start) {
continue
}
// If capacity is unknown (reqPct empty), any overlap blocks the slot.
if len(reqPct) == 0 {
return false
}
// Combined usage must not exceed 100 % for any requested dimension
for dim, needed := range reqPct {
if slot.Usage[dim]+needed > 100.0 {
if slot.Usage[dim]+needed >= 100.0 {
return false
}
}
@@ -255,7 +284,6 @@ func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID
if err != nil {
return
}
switch bk.ResourceType {
case tools.COMPUTE_RESOURCE:
instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request)
@@ -437,3 +465,34 @@ func totalRAM(instance *resources.ComputeResourceInstance) float64 {
}
return total
}
// NextAvailableStart returns the earliest time >= start when resourceID/instanceID has a
// free window of duration d. Slots are scanned in order so a single linear pass suffices.
// If the planner has no slots for this resource/instance, start is returned unchanged.
func (p *Planner) NextAvailableStart(resourceID, instanceID string, start time.Time, d time.Duration) time.Time {
slots := p.Schedule[resourceID]
if len(slots) == 0 {
return start
}
// Collect and sort slots for this instance by start time.
relevant := make([]*PlannerSlot, 0, len(slots))
for _, s := range slots {
if s.InstanceID == instanceID {
relevant = append(relevant, s)
}
}
sort.Slice(relevant, func(i, j int) bool { return relevant[i].Start.Before(relevant[j].Start) })
end := start.Add(d)
for _, slot := range relevant {
if !slot.Start.Before(end) {
break // all remaining slots start after our window — done
}
if slot.End.After(start) {
// conflict: push start to after this slot
start = slot.End
end = start.Add(d)
}
}
return start
}

View File

@@ -13,8 +13,8 @@ import (
)
func TestBooking_GetDurations(t *testing.T) {
start := time.Now().Add(-2 * time.Hour)
end := start.Add(1 * time.Hour)
start := time.Now().Add(-10 * time.Minute)
end := start.Add(5 * time.Minute)
realStart := start.Add(30 * time.Minute)
realEnd := realStart.Add(90 * time.Minute)

View File

@@ -92,7 +92,6 @@ func filterEnrich[T utils.ShallowDBObject](arr []string, isDrafted bool, a utils
"abstractobject.id": {{Operator: dbs.IN.String(), Value: arr}},
},
}, "", isDrafted)
fmt.Println(res, arr, isDrafted, a)
if code == 200 {
for _, r := range res {
new = append(new, r.(T))

View File

@@ -7,36 +7,36 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
near := float64(10000000000) // set a high value
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 {
near := float64(-1) // unset sentinel
for _, items := range planned { // loop through the planned items
for _, priced := range items { // loop through the priced items
if priced.GetLocationStart() == nil { // if the start is nil,
continue // skip the iteration
}
newS := priced.GetLocationStart() // get the start
if newS.Sub(start).Seconds() < near { // if the difference between the start and the new start is less than the nearest start
near = newS.Sub(start).Seconds()
newS := priced.GetLocationStart() // get the start
diff := newS.Sub(start).Seconds() // get the difference
if near < 0 || diff < near { // if the difference is less than the nearest start
near = diff
}
}
}
if near < 0 {
return 0 // no items found, start at the given start time
}
return near
}
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
if end == nil {
return -1
}
// GetPlannerLongestTime returns the sum of all processing durations (conservative estimate).
// Returns -1 if any processing is a service (open-ended).
func GetPlannerLongestTime(planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 {
longestTime := float64(0)
for _, priced := range planned[tools.PROCESSING_RESOURCE] {
if priced.GetLocationEnd() == nil {
continue
d := priced.GetExplicitDurationInS()
if d < 0 {
return -1 // service present: booking is open-ended
}
newS := priced.GetLocationEnd()
if end == nil && longestTime < newS.Sub(*end).Seconds() {
longestTime = newS.Sub(*end).Seconds()
}
// get the nearest start from start var
longestTime += d
}
return longestTime
}

View File

@@ -112,7 +112,7 @@ func getAverageTimeInSecond(averageTimeInSecond float64, start time.Time, end *t
fromAverageDuration := after.Sub(now).Seconds()
var tEnd time.Time
if end == nil {
tEnd = start.Add(1 * time.Hour)
tEnd = start.Add(5 * time.Minute)
} else {
tEnd = *end
}

View File

@@ -63,7 +63,7 @@ func Test_getAverageTimeInSecond_WithoutEnd(t *testing.T) {
func TestBookingEstimation(t *testing.T) {
start := time.Now()
end := start.Add(2 * time.Hour)
end := start.Add(10 * time.Minute)
strategies := map[pricing.TimePricingStrategy]float64{
pricing.ONCE: 50,
pricing.PER_HOUR: 10,
@@ -102,7 +102,7 @@ func TestPricingStrategy_Getters(t *testing.T) {
func TestPricingStrategy_GetPriceHT(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
// SUBSCRIPTION case
ps := pricing.PricingStrategy[DummyStrategy]{

View File

@@ -63,7 +63,7 @@ func (r *AbstractLive) GetResourceType() tools.DataType {
}
func (r *AbstractLive) StoreDraftDefault() {
r.IsDraft = true
r.IsDraft = false
}
func (r *AbstractLive) CanDelete() bool {

View File

@@ -9,13 +9,13 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
type computeUnitsMongoAccessor[T LiveInterface] struct {
type liveMongoAccessor[T LiveInterface] struct {
utils.AbstractAccessor[LiveInterface] // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
// New creates a new instance of the computeUnitsMongoAccessor
func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *computeUnitsMongoAccessor[T] {
return &computeUnitsMongoAccessor[T]{
func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *liveMongoAccessor[T] {
return &liveMongoAccessor[T]{
AbstractAccessor: utils.AbstractAccessor[LiveInterface]{
Logger: logs.CreateLogger(t.String()), // Create a logger with the data type
Request: request,
@@ -36,15 +36,15 @@ func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) *
/*
* Nothing special here, just the basic CRUD operations
*/
func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
func (a *liveMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
// is a publisher... that become a resources.
if data.IsDrafted() {
return nil, 422, errors.New("can't publish a drafted compute units")
}
live := data.(T)
if live.GetMonitorPath() == "" || live.GetID() != "" {
/*if live.GetMonitorPath() == "" || live.GetID() != "" {
return nil, 422, errors.New("publishing is only allowed is it can be monitored and be accessible")
}
}*/
if res, code, err := a.LoadOne(live.GetID()); err != nil {
return nil, code, err
} else {
@@ -57,7 +57,6 @@ func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObj
if len(live.GetResourcesID()) > 0 {
for _, r := range live.GetResourcesID() {
// TODO dependent of a existing resource
res, code, err := resAccess.LoadOne(r)
if err == nil {
return nil, code, err
@@ -78,7 +77,7 @@ func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObj
b, _ := json.Marshal(live)
json.Unmarshal(b, &r)
live.SetResourceInstance(r, instance)
res, code, err := resAccess.StoreOne(r)
res, code, err := utils.GenericStoreOne(r, resAccess)
if err != nil {
return nil, code, err
}

View File

@@ -17,10 +17,10 @@ import (
type Order struct {
utils.AbstractObject
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
Billing map[pricing.BillingStrategy][]*booking.Booking `json:"billing" bson:"billing"`
}

View File

@@ -41,7 +41,6 @@ func CheckPeerStatus(peerID string, appName string) (*Peer, bool) {
return nil, false
}
url := urlFormat(res.(*Peer).APIUrl, tools.PEER) + "/status" // Format the URL
fmt.Println(url)
state, services := api.CheckRemotePeer(url)
res.(*Peer).ServicesState = services // Update the services states of the peer
access.UpdateOne(res.Serialize(res), peerID) // Update the peer in the db

View File

@@ -36,11 +36,11 @@ func (abs *ComputeResource) ConvertToPricedResource(t tools.DataType, selectedIn
if t != tools.COMPUTE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Compute")
}
p, err := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, request)
p, err := ConvertToPricedResource[*ComputeResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource)
priced := p.(*PricedResource[*ComputeResourcePricingProfile])
return &PricedComputeResource{
PricedResource: *priced,
}, nil
@@ -122,7 +122,10 @@ func (p *ComputeResourcePricingProfile) GetPriceHT(amountOfData float64, explici
return 0, errors.New("params must be set")
}
pp := float64(0)
model := params[1]
model := ""
if len(params) > 1 {
model = params[1]
}
if strings.Contains(params[0], "cpus") && len(params) > 1 {
if _, ok := p.CPUsPrices[model]; ok {
p.Pricing.Price = p.CPUsPrices[model]
@@ -158,18 +161,35 @@ func (p *ComputeResourcePricingProfile) GetPriceHT(amountOfData float64, explici
}
type PricedComputeResource struct {
PricedResource
PricedResource[*ComputeResourcePricingProfile]
CPUsLocated map[string]float64 `json:"cpus_in_use" bson:"cpus_in_use"` // CPUsInUse is the list of CPUs in use
GPUsLocated map[string]float64 `json:"gpus_in_use" bson:"gpus_in_use"` // GPUsInUse is the list of GPUs in use
RAMLocated float64 `json:"ram_in_use" bson:"ram_in_use"` // RAMInUse is the RAM in use
}
func (r *PricedComputeResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &ComputeResourcePricingProfile{}
}
}
func (r *PricedComputeResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedComputeResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedComputeResource) GetType() tools.DataType {
return tools.COMPUTE_RESOURCE
}
func (r *PricedComputeResource) GetPriceHT() (float64, error) {
r.ensurePricing()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
}
@@ -178,12 +198,9 @@ func (r *PricedComputeResource) GetPriceHT() (float64, error) {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Compute" + r.ResourceID)
}
pricing := r.SelectedPricing
price := float64(0)
for _, l := range []map[string]float64{r.CPUsLocated, r.GPUsLocated} {

View File

@@ -41,11 +41,11 @@ func (abs *DataResource) ConvertToPricedResource(t tools.DataType, selectedInsta
if t != tools.DATA_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data")
}
p, err := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, request)
p, err := ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource)
priced := p.(*PricedResource[*DataResourcePricingProfile])
return &PricedDataResource{
PricedResource: *priced,
}, nil
@@ -160,15 +160,32 @@ func (p *DataResourcePricingProfile) IsBooked() bool {
}
type PricedDataResource struct {
PricedResource
PricedResource[*DataResourcePricingProfile]
UsageStorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"`
}
func (r *PricedDataResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &DataResourcePricingProfile{}
}
}
func (r *PricedDataResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedDataResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedDataResource) GetType() tools.DataType {
return tools.DATA_RESOURCE
}
func (r *PricedDataResource) GetPriceHT() (float64, error) {
r.ensurePricing()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
}
@@ -177,12 +194,9 @@ func (r *PricedDataResource) GetPriceHT() (float64, error) {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Data" + r.ResourceID)
}
pricing := r.SelectedPricing
var err error
amountOfData := float64(1)

View File

@@ -8,6 +8,10 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
type PricedResourceITF interface {
pricing.PricedItemITF
}
type ResourceInterface interface {
utils.DBObject
FilterPeer(peerID string) *dbs.Filters
@@ -15,7 +19,7 @@ type ResourceInterface interface {
ConvertToPricedResource(t tools.DataType, a *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, b *int, request *tools.APIRequest) (pricing.PricedItemITF, error)
GetType() string
ClearEnv() utils.DBObject
SetAllowedInstances(request *tools.APIRequest, instance_id ...string)
SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF
AddInstances(instance ResourceInstanceITF)
GetSelectedInstance(index *int) ResourceInstanceITF
}
@@ -31,6 +35,8 @@ type ResourceInstanceITF interface {
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups()
GetAverageDurationS() float64
UpdateAverageDuration(actualS float64)
}
type ResourcePartnerITF interface {

View File

@@ -37,12 +37,13 @@ func (d *NativeTool) ClearEnv() utils.DBObject {
return d
}
func (w *NativeTool) SetAllowedInstances(request *tools.APIRequest, ids ...string) {
func (w *NativeTool) SetAllowedInstances(request *tools.APIRequest, ids ...string) []ResourceInstanceITF {
/* EMPTY */
return []ResourceInstanceITF{}
}
func (w *NativeTool) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
return &PricedResource{
return &PricedResource[*pricing.ExploitPricingProfile[pricing.TimePricingStrategy]]{
Name: w.Name,
Logo: w.Logo,
ResourceID: w.UUID,

View File

@@ -16,11 +16,11 @@ type BookingConfiguration struct {
Mode booking.BookingMode `json:"mode,omitempty" bson:"mode,omitempty"`
}
type PricedResource struct {
type PricedResource[T pricing.PricingProfileITF] struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Logo string `json:"logo,omitempty" bson:"logo,omitempty"`
InstancesRefs map[string]string `json:"instances_refs,omitempty" bson:"instances_refs,omitempty"`
SelectedPricing pricing.PricingProfileITF `json:"selected_pricing,omitempty" bson:"selected_pricing,omitempty"`
SelectedPricing T `json:"selected_pricing,omitempty" bson:"selected_pricing,omitempty"`
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
BookingConfiguration *BookingConfiguration `json:"booking_configuration,omitempty" bson:"booking_configuration,omitempty"`
Variations []*pricing.PricingVariation `json:"pricing_variations" bson:"pricing_variations"`
@@ -31,101 +31,107 @@ type PricedResource struct {
ResourceType tools.DataType `json:"resource_type,omitempty" bson:"resource_type,omitempty"`
}
func (abs *PricedResource) GetQuantity() int {
func (abs *PricedResource[T]) GetQuantity() int {
return abs.Quantity
}
func (abs *PricedResource) AddQuantity(amount int) {
func (abs *PricedResource[T]) AddQuantity(amount int) {
abs.Quantity += amount
}
func (abs *PricedResource) SelectPricing() pricing.PricingProfileITF {
func (abs *PricedResource[T]) SelectPricing() pricing.PricingProfileITF {
return abs.SelectedPricing
}
func (abs *PricedResource) GetID() string {
func (abs *PricedResource[T]) GetID() string {
return abs.ResourceID
}
func (abs *PricedResource) GetInstanceID() string {
func (abs *PricedResource[T]) GetInstanceID() string {
return abs.InstanceID
}
func (abs *PricedResource) GetType() tools.DataType {
func (abs *PricedResource[T]) GetType() tools.DataType {
return abs.ResourceType
}
func (abs *PricedResource) GetCreatorID() string {
func (abs *PricedResource[T]) GetCreatorID() string {
return abs.CreatorID
}
func (abs *PricedResource) IsPurchasable() bool {
if abs.SelectedPricing == nil {
// IsPurchasable and IsBooked fall back to false when SelectedPricing is a nil interface.
// Concrete types (PricedComputeResource, etc.) override these and guarantee non-nil pricing.
func (abs *PricedResource[T]) IsPurchasable() bool {
if any(abs.SelectedPricing) == nil {
return false
}
return (abs.SelectedPricing).IsPurchasable()
return abs.SelectedPricing.IsPurchasable()
}
func (abs *PricedResource) IsBooked() bool {
if abs.SelectedPricing == nil {
func (abs *PricedResource[T]) IsBooked() bool {
if any(abs.SelectedPricing) == nil {
return false
}
return (abs.SelectedPricing).IsBooked()
return abs.SelectedPricing.IsBooked()
}
func (abs *PricedResource) GetLocationEnd() *time.Time {
func (abs *PricedResource[T]) GetLocationEnd() *time.Time {
if abs.BookingConfiguration == nil {
return nil
}
return abs.BookingConfiguration.UsageEnd
}
func (abs *PricedResource) GetLocationStart() *time.Time {
func (abs *PricedResource[T]) GetLocationStart() *time.Time {
if abs.BookingConfiguration == nil {
return nil
}
return abs.BookingConfiguration.UsageStart
}
func (abs *PricedResource) SetLocationStart(start time.Time) {
func (abs *PricedResource[T]) SetLocationStart(start time.Time) {
if abs.BookingConfiguration == nil {
abs.BookingConfiguration = &BookingConfiguration{}
}
abs.BookingConfiguration.UsageStart = &start
}
func (abs *PricedResource) SetLocationEnd(end time.Time) {
func (abs *PricedResource[T]) SetLocationEnd(end time.Time) {
if abs.BookingConfiguration == nil {
abs.BookingConfiguration = &BookingConfiguration{}
}
abs.BookingConfiguration.UsageEnd = &end
}
func (abs *PricedResource) GetBookingMode() booking.BookingMode {
func (abs *PricedResource[T]) GetBookingMode() booking.BookingMode {
if abs.BookingConfiguration == nil {
return booking.WHEN_POSSIBLE
}
return abs.BookingConfiguration.Mode
}
func (abs *PricedResource) GetExplicitDurationInS() float64 {
func (abs *PricedResource[T]) GetExplicitDurationInS() float64 {
if abs.BookingConfiguration == nil {
abs.BookingConfiguration = &BookingConfiguration{}
}
if abs.BookingConfiguration.ExplicitBookingDurationS == 0 {
if abs.BookingConfiguration.UsageEnd == nil && abs.BookingConfiguration.UsageStart == nil {
return time.Duration(1 * time.Hour).Seconds()
return (5 * time.Minute).Seconds()
}
if abs.BookingConfiguration.UsageEnd == nil {
add := abs.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := abs.BookingConfiguration.UsageStart.Add(5 * time.Minute)
abs.BookingConfiguration.UsageEnd = &add
}
return abs.BookingConfiguration.UsageEnd.Sub(*abs.BookingConfiguration.UsageStart).Seconds()
d := abs.BookingConfiguration.UsageEnd.Sub(*abs.BookingConfiguration.UsageStart).Seconds()
if d <= 0 {
return (5 * time.Minute).Seconds()
}
return d
}
return abs.BookingConfiguration.ExplicitBookingDurationS
}
func (r *PricedResource) GetPriceHT() (float64, error) {
func (r *PricedResource[T]) GetPriceHT() (float64, error) {
now := time.Now()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
@@ -134,11 +140,11 @@ func (r *PricedResource) GetPriceHT() (float64, error) {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Resource " + r.ResourceID)
if any(r.SelectedPricing) == nil {
return 0, errors.New("pricing profile must be set for resource " + r.ResourceID)
}
pricing := r.SelectedPricing
return pricing.GetPriceHT(1, 0, *r.BookingConfiguration.UsageStart, *r.BookingConfiguration.UsageEnd, r.Variations)

View File

@@ -1,6 +1,7 @@
package resources
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
@@ -65,10 +66,31 @@ type ProcessingResourcePartnership struct {
}
type PricedProcessingResource struct {
PricedResource
PricedResource[*ProcessingResourcePricingProfile]
IsService bool
}
func (r *PricedProcessingResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &ProcessingResourcePricingProfile{}
}
}
func (r *PricedProcessingResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedProcessingResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedProcessingResource) GetPriceHT() (float64, error) {
r.ensurePricing()
return r.PricedResource.GetPriceHT()
}
func (r *PricedProcessingResource) GetType() tools.DataType {
return tools.PROCESSING_RESOURCE
}
@@ -82,7 +104,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
if a.IsService {
return -1
}
return time.Duration(1 * time.Hour).Seconds()
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
}
@@ -93,6 +115,20 @@ func (d *ProcessingResource) GetAccessor(request *tools.APIRequest) utils.Access
return NewAccessor[*ProcessingResource](tools.PROCESSING_RESOURCE, request, func() utils.DBObject { return &ProcessingResource{} }) // Create a new instance of the accessor
}
func (abs *ProcessingResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.PROCESSING_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data")
}
p, err := ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource[*DataResourcePricingProfile])
return &PricedDataResource{
PricedResource: *priced,
}, nil
}
type ProcessingResourcePricingProfile struct {
pricing.AccessPricingProfile[pricing.TimePricingStrategy] // AccessPricingProfile is the pricing profile of a data it means that we can access the data for an amount of time
}

View File

@@ -21,8 +21,7 @@ type PurchaseResource struct {
ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"`
// Authorization: identifies who created this draft and the Check session it belongs to.
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
SchedulingSessionID string `json:"scheduling_session_id,omitempty" bson:"scheduling_session_id,omitempty"`
SchedulerPeerID string `json:"scheduler_peer_id,omitempty" bson:"scheduler_peer_id,omitempty"`
}
func (d *PurchaseResource) GetAccessor(request *tools.APIRequest) utils.Accessor {

View File

@@ -36,8 +36,8 @@ func TestCanUpdate(t *testing.T) {
func TestCanDelete(t *testing.T) {
now := time.Now().UTC()
past := now.Add(-1 * time.Hour)
future := now.Add(1 * time.Hour)
past := now.Add(-5 * time.Minute)
future := now.Add(5 * time.Minute)
t.Run("nil EndDate", func(t *testing.T) {
r := &purchase_resource.PurchaseResource{}

View File

@@ -21,11 +21,11 @@ import (
type AbstractResource struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the resource
Logo string `json:"logo,omitempty" bson:"logo,omitempty" validate:"required"` // Logo is the logo of the resource
Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource
ShortDescription string `json:"short_description,omitempty" bson:"short_description,omitempty" validate:"required"` // ShortDescription is the short description of the resource
Owners []utils.Owner `json:"owners,omitempty" bson:"owners,omitempty"` // Owners is the list of owners of the resource
Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the resource
Logo string `json:"logo,omitempty" bson:"logo,omitempty"` // Logo is the logo of the resource
Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource
ShortDescription string `json:"short_description,omitempty" bson:"short_description,omitempty"` // ShortDescription is the short description of the resource
Owners []utils.Owner `json:"owners,omitempty" bson:"owners,omitempty"` // Owners is the list of owners of the resource
UsageRestrictions string `bson:"usage_restrictions,omitempty" json:"usage_restrictions,omitempty"`
AllowedBookingModes map[booking.BookingMode]*pricing.PricingVariation `bson:"allowed_booking_modes" json:"allowed_booking_modes"`
}
@@ -66,16 +66,16 @@ func (r *AbstractResource) CanDelete() bool {
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"`
}
func (abs *AbstractInstanciatedResource[T]) AddInstances(instance ResourceInstanceITF) {
abs.Instances = append(abs.Instances, instance.(T))
}
func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.DataType,
func ConvertToPricedResource[T pricing.PricingProfileITF](t tools.DataType,
selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int,
selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
selectedBookingModeIndex *int, abs ResourceInterface, request *tools.APIRequest) (pricing.PricedItemITF, error) {
instances := map[string]string{}
var profile pricing.PricingProfileITF
var inst ResourceInstanceITF
@@ -84,7 +84,7 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data
instances[t.GetID()] = t.GetName()
profile = t.GetProfile(request.PeerID, selectedPartnership, selectedBuyingStrategy, selectedStrategy)
} else {
for i, instance := range abs.Instances { // TODO why it crush before ?
for i, instance := range abs.SetAllowedInstances(request) { // TODO why it crush before ?
if i == 0 {
inst = instance
}
@@ -106,20 +106,33 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data
}*/
}
variations := []*pricing.PricingVariation{}
if selectedBookingModeIndex != nil && abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)] != nil {
variations = append(variations, abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)])
if selectedBookingModeIndex != nil && abs.GetBookingModes()[booking.BookingMode(*selectedBookingModeIndex)] != nil {
variations = append(variations, abs.GetBookingModes()[booking.BookingMode(*selectedBookingModeIndex)])
}
return &PricedResource{
Name: abs.Name,
Logo: abs.Logo,
ResourceID: abs.UUID,
InstanceID: inst.GetID(),
ResourceType: t,
Quantity: 1,
InstancesRefs: instances,
SelectedPricing: profile,
Variations: variations,
CreatorID: abs.CreatorID,
// Seed the booking configuration with the instance's historical average duration
// so GetExplicitDurationInS() returns a realistic default out of the box.
var bc *BookingConfiguration
if inst != nil {
if avg := inst.GetAverageDurationS(); avg > 0 {
bc = &BookingConfiguration{ExplicitBookingDurationS: avg}
}
}
instanceID := ""
if inst != nil {
instanceID = inst.GetID()
}
selectedPricing, _ := profile.(T)
return &PricedResource[T]{
Name: abs.GetName(),
ResourceID: abs.GetID(),
InstanceID: instanceID,
ResourceType: t,
Quantity: 1,
InstancesRefs: instances,
SelectedPricing: selectedPricing,
Variations: variations,
CreatorID: abs.GetCreatorID(),
BookingConfiguration: bc,
}, nil
}
@@ -140,11 +153,16 @@ func (r *AbstractInstanciatedResource[T]) GetSelectedInstance(selected *int) Res
return nil
}
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest, instanceID ...string) {
if (request != nil && request.PeerID == abs.CreatorID && request.PeerID != "") || request.Admin {
return
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest, instanceID ...string) []ResourceInstanceITF {
if !((request != nil && request.PeerID == abs.CreatorID && request.PeerID != "") || request.Admin) {
abs.Instances = VerifyAuthAction(abs.Instances, request, instanceID...)
}
abs.Instances = VerifyAuthAction(abs.Instances, request, instanceID...)
inst := []ResourceInstanceITF{}
for _, i := range abs.Instances {
inst = append(inst, i)
}
return inst
}
func (abs *AbstractInstanciatedResource[T]) VerifyAuth(callName string, request *tools.APIRequest) bool {
@@ -196,6 +214,9 @@ type ResourceInstance[T ResourcePartnerITF] struct {
Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"`
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
AverageDurationS float64 `json:"average_duration_s,omitempty" bson:"average_duration_s,omitempty"`
AverageDurationSamples int `json:"average_duration_samples,omitempty" bson:"average_duration_samples,omitempty"`
}
// TODO should kicks all selection
@@ -285,6 +306,17 @@ func (ri *ResourceInstance[T]) ClearPeerGroups() {
}
}
func (ri *ResourceInstance[T]) GetAverageDurationS() float64 {
return ri.AverageDurationS
}
func (ri *ResourceInstance[T]) UpdateAverageDuration(actualS float64) {
buffered := actualS * 1.20
n := float64(ri.AverageDurationSamples)
ri.AverageDurationS = (ri.AverageDurationS*n + buffered) / (n + 1)
ri.AverageDurationSamples++
}
type ResourcePartnerShip[T pricing.PricingProfileITF] struct {
Namespace string `json:"namespace" bson:"namespace" default:"default-namespace"`
PeerGroups map[string][]string `json:"peer_groups,omitempty" bson:"peer_groups,omitempty"`

View File

@@ -2,7 +2,6 @@ package resources
import (
"errors"
"fmt"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
@@ -35,11 +34,11 @@ func (abs *StorageResource) ConvertToPricedResource(t tools.DataType, selectedIn
if t != tools.STORAGE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Storage")
}
p, err := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, request)
p, err := ConvertToPricedResource[*StorageResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource)
priced := p.(*PricedResource[*StorageResourcePricingProfile])
return &PricedStorageResource{
PricedResource: *priced,
}, nil
@@ -181,30 +180,43 @@ func (p *StorageResourcePricingProfile) IsBooked() bool {
}
type PricedStorageResource struct {
PricedResource
PricedResource[*StorageResourcePricingProfile]
UsageStorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"`
}
func (r *PricedStorageResource) ensurePricing() {
if r.SelectedPricing == nil {
r.SelectedPricing = &StorageResourcePricingProfile{}
}
}
func (r *PricedStorageResource) IsPurchasable() bool {
r.ensurePricing()
return r.SelectedPricing.IsPurchasable()
}
func (r *PricedStorageResource) IsBooked() bool {
r.ensurePricing()
return r.SelectedPricing.IsBooked()
}
func (r *PricedStorageResource) GetType() tools.DataType {
return tools.STORAGE_RESOURCE
}
func (r *PricedStorageResource) GetPriceHT() (float64, error) {
r.ensurePricing()
if r.BookingConfiguration == nil {
r.BookingConfiguration = &BookingConfiguration{}
}
fmt.Println("GetPriceHT", r.BookingConfiguration.UsageStart, r.BookingConfiguration.UsageEnd)
now := time.Now()
if r.BookingConfiguration.UsageStart == nil {
r.BookingConfiguration.UsageStart = &now
}
if r.BookingConfiguration.UsageEnd == nil {
add := r.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := r.BookingConfiguration.UsageStart.Add(time.Duration(5 * time.Minute))
r.BookingConfiguration.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("pricing profile must be set on Priced Storage" + r.ResourceID)
}
pricing := r.SelectedPricing
var err error
amountOfData := float64(1)

View File

@@ -37,7 +37,7 @@ func TestComputeResource_ConvertToPricedResource(t *testing.T) {
func TestComputeResourcePricingProfile_GetPriceHT_CPUs(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
profile := resources.ComputeResourcePricingProfile{
CPUsPrices: map[string]float64{"Xeon": 2.0},
ExploitPricingProfile: pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{
@@ -61,11 +61,18 @@ func TestComputeResourcePricingProfile_GetPriceHT_InvalidParams(t *testing.T) {
func TestPricedComputeResource_GetPriceHT(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
r := resources.PricedComputeResource{
PricedResource: resources.PricedResource{
ResourceID: "comp456",
SelectedPricing: &MockPricingProfile{ReturnCost: 1.0},
PricedResource: resources.PricedResource[*resources.ComputeResourcePricingProfile]{
ResourceID: "comp456",
SelectedPricing: &resources.ComputeResourcePricingProfile{
CPUsPrices: map[string]float64{"Xeon": 2.0},
ExploitPricingProfile: pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{
AccessPricingProfile: pricing.AccessPricingProfile[pricing.TimePricingStrategy]{
Pricing: pricing.PricingStrategy[pricing.TimePricingStrategy]{Price: 1.0},
},
},
},
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start,
UsageEnd: &end,
@@ -73,8 +80,8 @@ func TestPricedComputeResource_GetPriceHT(t *testing.T) {
},
},
CPUsLocated: map[string]float64{"Xeon": 2},
GPUsLocated: map[string]float64{"Tesla": 1},
RAMLocated: 4,
GPUsLocated: map[string]float64{},
RAMLocated: 0,
}
price, err := r.GetPriceHT()
@@ -84,7 +91,7 @@ func TestPricedComputeResource_GetPriceHT(t *testing.T) {
func TestPricedComputeResource_GetPriceHT_MissingProfile(t *testing.T) {
r := resources.PricedComputeResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.ComputeResourcePricingProfile]{
ResourceID: "comp789",
},
}

View File

@@ -76,13 +76,13 @@ func TestDataResourcePricingStrategy_GetQuantity(t *testing.T) {
func TestDataResourcePricingProfile_IsPurchased(t *testing.T) {
profile := &resources.DataResourcePricingProfile{}
profile.Pricing.BuyingStrategy = pricing.SUBSCRIPTION
profile.Pricing.BuyingStrategy = pricing.PERMANENT
assert.True(t, profile.IsPurchasable())
}
func TestPricedDataResource_GetPriceHT(t *testing.T) {
now := time.Now()
later := now.Add(1 * time.Hour)
later := now.Add(5 * time.Minute)
mockPrice := 42.0
pricingProfile := &resources.DataResourcePricingProfile{AccessPricingProfile: pricing.AccessPricingProfile[resources.DataResourcePricingStrategy]{
@@ -91,7 +91,7 @@ func TestPricedDataResource_GetPriceHT(t *testing.T) {
pricingProfile.Pricing.OverrideStrategy = resources.PER_GB_DOWNLOADED
r := &resources.PricedDataResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.DataResourcePricingProfile]{
SelectedPricing: pricingProfile,
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &now,
@@ -107,7 +107,7 @@ func TestPricedDataResource_GetPriceHT(t *testing.T) {
func TestPricedDataResource_GetPriceHT_NoProfiles(t *testing.T) {
r := &resources.PricedDataResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.DataResourcePricingProfile]{
ResourceID: "test-resource",
},
}

View File

@@ -36,7 +36,7 @@ func (m *MockPricingProfile) GetPriceHT(amount float64, explicitDuration float64
// ---- Tests ----
func TestGetIDAndCreatorAndType(t *testing.T) {
r := resources.PricedResource{
r := resources.PricedResource[pricing.PricingProfileITF]{
ResourceID: "res-123",
CreatorID: "user-abc",
ResourceType: tools.DATA_RESOURCE,
@@ -48,23 +48,23 @@ func TestGetIDAndCreatorAndType(t *testing.T) {
func TestIsPurchased(t *testing.T) {
t.Run("nil selected pricing returns false", func(t *testing.T) {
r := &resources.PricedResource{}
r := &resources.PricedResource[pricing.PricingProfileITF]{}
assert.False(t, r.IsPurchasable())
})
t.Run("returns true if pricing profile is purchased", func(t *testing.T) {
mock := &MockPricingProfile{Purchased: true}
r := &resources.PricedResource{SelectedPricing: mock}
r := &resources.PricedResource[pricing.PricingProfileITF]{SelectedPricing: mock}
assert.True(t, r.IsPurchasable())
})
}
func TestGetAndSetLocationStartEnd(t *testing.T) {
r := &resources.PricedResource{}
r := &resources.PricedResource[pricing.PricingProfileITF]{}
now := time.Now()
r.SetLocationStart(now)
r.SetLocationEnd(now.Add(2 * time.Hour))
r.SetLocationEnd(now.Add(10 * time.Minute))
assert.Equal(t, now, *r.GetLocationStart())
assert.Equal(t, now.Add(2*time.Hour), *r.GetLocationEnd())
@@ -72,7 +72,7 @@ func TestGetAndSetLocationStartEnd(t *testing.T) {
func TestGetExplicitDurationInS(t *testing.T) {
t.Run("uses explicit duration if set", func(t *testing.T) {
r := &resources.PricedResource{BookingConfiguration: &resources.BookingConfiguration{
r := &resources.PricedResource[pricing.PricingProfileITF]{BookingConfiguration: &resources.BookingConfiguration{
ExplicitBookingDurationS: 3600,
},
}
@@ -81,8 +81,8 @@ func TestGetExplicitDurationInS(t *testing.T) {
t.Run("computes duration from start and end", func(t *testing.T) {
start := time.Now()
end := start.Add(2 * time.Hour)
r := &resources.PricedResource{
end := start.Add(10 * time.Minute)
r := &resources.PricedResource[pricing.PricingProfileITF]{
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start, UsageEnd: &end,
},
@@ -91,14 +91,14 @@ func TestGetExplicitDurationInS(t *testing.T) {
})
t.Run("defaults to 1 hour when times not set", func(t *testing.T) {
r := &resources.PricedResource{}
r := &resources.PricedResource[pricing.PricingProfileITF]{}
assert.InDelta(t, 3600.0, r.GetExplicitDurationInS(), 0.1)
})
}
func TestGetPriceHT(t *testing.T) {
t.Run("returns error if no pricing profile", func(t *testing.T) {
r := &resources.PricedResource{ResourceID: "no-profile"}
r := &resources.PricedResource[pricing.PricingProfileITF]{ResourceID: "no-profile"}
price, err := r.GetPriceHT()
require.Error(t, err)
assert.Contains(t, err.Error(), "pricing profile must be set")
@@ -107,7 +107,7 @@ func TestGetPriceHT(t *testing.T) {
t.Run("defaults BookingConfiguration when nil", func(t *testing.T) {
mock := &MockPricingProfile{ReturnCost: 42.0}
r := &resources.PricedResource{
r := &resources.PricedResource[pricing.PricingProfileITF]{
SelectedPricing: mock,
}
price, err := r.GetPriceHT()
@@ -117,9 +117,9 @@ func TestGetPriceHT(t *testing.T) {
t.Run("returns error if profile GetPriceHT fails", func(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
mock := &MockPricingProfile{ReturnErr: true}
r := &resources.PricedResource{
r := &resources.PricedResource[pricing.PricingProfileITF]{
SelectedPricing: mock,
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start,
@@ -133,9 +133,9 @@ func TestGetPriceHT(t *testing.T) {
t.Run("uses SelectedPricing if set", func(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
end := start.Add(5 * time.Minute)
mock := &MockPricingProfile{ReturnCost: 10.0}
r := &resources.PricedResource{
r := &resources.PricedResource[pricing.PricingProfileITF]{
SelectedPricing: mock,
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &start,

View File

@@ -23,7 +23,7 @@ func TestPricedProcessingResource_GetType(t *testing.T) {
func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
now := time.Now()
after := now.Add(2 * time.Hour)
after := now.Add(10 * time.Minute)
tests := []struct {
name string
@@ -40,30 +40,30 @@ func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
{
name: "Nil start time, non-service",
input: PricedProcessingResource{
PricedResource: PricedResource{
PricedResource: PricedResource[*ProcessingResourcePricingProfile]{
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: nil,
},
},
},
expected: float64((1 * time.Hour).Seconds()),
expected: float64((5 * time.Minute).Seconds()),
},
{
name: "Duration computed from start and end",
input: PricedProcessingResource{
PricedResource: PricedResource{
PricedResource: PricedResource[*ProcessingResourcePricingProfile]{
BookingConfiguration: &resources.BookingConfiguration{
UsageStart: &now,
UsageEnd: &after,
},
},
},
expected: float64((2 * time.Hour).Seconds()),
expected: float64((10 * time.Minute).Seconds()),
},
{
name: "Explicit duration takes precedence",
input: PricedProcessingResource{
PricedResource: PricedResource{
PricedResource: PricedResource[*ProcessingResourcePricingProfile]{
BookingConfiguration: &resources.BookingConfiguration{
ExplicitBookingDurationS: 1337,
},
@@ -89,14 +89,14 @@ func TestProcessingResource_GetAccessor(t *testing.T) {
func TestProcessingResourcePricingProfile_GetPriceHT(t *testing.T) {
start := time.Now()
end := start.Add(2 * time.Hour)
end := start.Add(10 * time.Minute)
mockPricing := pricing.AccessPricingProfile[pricing.TimePricingStrategy]{
Pricing: pricing.PricingStrategy[pricing.TimePricingStrategy]{
Price: 100.0,
},
}
profile := &ProcessingResourcePricingProfile{AccessPricingProfile: mockPricing}
price, err := profile.GetPriceHT(0, 0, start, end, []*pricing.PricingVariation{})
price, err := profile.GetPriceHT(1, 0, start, end, []*pricing.PricingVariation{})
assert.NoError(t, err)
assert.Equal(t, 100.0, price)
}

View File

@@ -81,8 +81,8 @@ func TestGetSelectedInstance_NoIndex(t *testing.T) {
}
func TestCanUpdate_WhenOnlyStateDiffers(t *testing.T) {
resource := &resources.AbstractResource{AbstractObject: utils.AbstractObject{IsDraft: false}}
set := &MockDBObject{isDraft: true}
resource := &resources.AbstractResource{AbstractObject: utils.AbstractObject{IsDraft: true}}
set := &MockDBObject{isDraft: false}
canUpdate, updated := resource.CanUpdate(set)
assert.True(t, canUpdate)
assert.Equal(t, set, updated)
@@ -105,8 +105,13 @@ type FakeResource struct {
resources.AbstractInstanciatedResource[*MockInstance]
}
func (f *FakeResource) SetAllowedInstances(*tools.APIRequest, ...string) {}
func (f *FakeResource) VerifyAuth(string, *tools.APIRequest) bool { return true }
func (f *FakeResource) SetAllowedInstances(req *tools.APIRequest, instance_id ...string) []resources.ResourceInstanceITF {
return nil
}
func (f *FakeResource) ConvertToPricedResource(t tools.DataType, a *int, b *int, c *int, d *int, e *int, req *tools.APIRequest) (pricing.PricedItemITF, error) {
return nil, nil
}
func (f *FakeResource) VerifyAuth(string, *tools.APIRequest) bool { return true }
func TestNewAccessor_ReturnsValid(t *testing.T) {
acc := resources.NewAccessor[*FakeResource](tools.COMPUTE_RESOURCE, &tools.APIRequest{}, func() utils.DBObject {

View File

@@ -96,7 +96,7 @@ func TestStorageResourcePricingStrategy_GetQuantity_Invalid(t *testing.T) {
func TestPricedStorageResource_GetPriceHT_NoProfiles(t *testing.T) {
res := &resources.PricedStorageResource{
PricedResource: resources.PricedResource{
PricedResource: resources.PricedResource[*resources.StorageResourcePricingProfile]{
ResourceID: "res-id",
},
}

View File

@@ -30,8 +30,9 @@ func (d *WorkflowResource) ClearEnv() utils.DBObject {
return d
}
func (w *WorkflowResource) SetAllowedInstances(request *tools.APIRequest, ids ...string) {
func (w *WorkflowResource) SetAllowedInstances(request *tools.APIRequest, ids ...string) []ResourceInstanceITF {
/* EMPTY */
return []ResourceInstanceITF{}
}
func (r *WorkflowResource) GetSelectedInstance(selected *int) ResourceInstanceITF {
@@ -39,7 +40,7 @@ func (r *WorkflowResource) GetSelectedInstance(selected *int) ResourceInstanceIT
}
func (w *WorkflowResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
return &PricedResource{
return &PricedResource[*pricing.ExploitPricingProfile[pricing.TimePricingStrategy]]{
Name: w.Name,
Logo: w.Logo,
ResourceID: w.UUID,

View File

@@ -176,6 +176,10 @@ type AbstractAccessor[T DBObject] struct {
NotImplemented []string
}
func (r *AbstractAccessor[T]) NewObj() DBObject {
return r.New()
}
func (r *AbstractAccessor[T]) ShouldVerifyAuth() bool {
return true
}

View File

@@ -32,7 +32,6 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
if data.GetID() == "" {
data.GenerateID()
}
data.SetID(data.GetID())
data.StoreDraftDefault()
data.UpToDate(a.GetUser(), a.GetPeerID(), true)
data.Unsign()
@@ -98,7 +97,7 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
if err != nil {
return nil, nil, c, err
}
obj := &AbstractObject{}
obj := a.NewObj()
b, _ := json.Marshal(r)
json.Unmarshal(b, obj)
ok, r := r.CanUpdate(obj)

View File

@@ -40,6 +40,7 @@ type DBObject interface {
// Accessor is an interface that defines the basic methods for an Accessor
type Accessor interface {
NewObj() DBObject
GetUser() string
GetPeerID() string
GetGroups() []string

View File

@@ -1,8 +1,6 @@
package graph
import (
"time"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -67,46 +65,32 @@ func (wf *Graph) IsWorkflow(item GraphItem) bool {
return item.Workflow != nil
}
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.ProcessingResource, resource resources.ResourceInterface,
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(processings []*resources.ProcessingResource, resource resources.ResourceInterface,
f func(GraphItem) resources.ResourceInterface, instance int, partnership int, buying int, strategy int, bookingMode int, request *tools.APIRequest) (float64, float64, error) {
nearestStart := float64(10000000000)
oneIsInfinite := false
longestDuration := float64(0)
for _, link := range g.Links {
for _, processing := range processings {
var source string // source is the source of the link
if link.Destination.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the destination is the processing and the source is not a compute
source = link.Source.ID
} else if link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the source is the processing and the destination is not a compute
source = link.Destination.ID
if !(link.Destination.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID()) &&
!(link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID()) {
continue
}
priced, err := processing.ConvertToPricedResource(tools.PROCESSING_RESOURCE, &instance, &partnership, &buying, &strategy, &bookingMode, request)
if err != nil {
return 0, 0, err
}
if source != "" {
if priced.GetLocationStart() != nil {
near := float64(priced.GetLocationStart().Sub(start).Seconds())
if near < nearestStart {
nearestStart = near
}
}
if priced.GetLocationEnd() != nil {
duration := float64(priced.GetLocationEnd().Sub(*priced.GetLocationStart()).Seconds())
if longestDuration < duration {
longestDuration = duration
}
} else {
oneIsInfinite = true
}
duration := priced.GetExplicitDurationInS()
if duration < 0 {
oneIsInfinite = true
} else if longestDuration < duration {
longestDuration = duration
}
}
}
if oneIsInfinite {
return nearestStart, -1, nil
return 0, -1, nil
}
return nearestStart, longestDuration, nil
return 0, longestDuration, nil
}
/*
@@ -155,7 +139,7 @@ func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingI
func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterface) {
if item, ok := g.Items[id]; ok {
if item.Data != nil {
if item.NativeTool != nil {
return tools.NATIVE_TOOL, item.NativeTool
} else if item.Data != nil {
return tools.DATA_RESOURCE, item.Data

282
models/workflow/plantuml.go Normal file
View File

@@ -0,0 +1,282 @@
package workflow
import (
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
)
// ---------------------------------------------------------------------------
// PlantUML export
// ---------------------------------------------------------------------------
// plantUMLProcedures defines !procedure blocks for each resource type.
// Parameters use the $var/$name convention of PlantUML preprocessor v2.
// Calls are written WITHOUT inline comments (comment on the following line)
// to avoid the "assumed sequence diagram" syntax error.
const plantUMLProcedures = `!procedure Processing($var, $name)
component "$name" as $var <<Processing>>
!endprocedure
!procedure Data($var, $name)
file "$name" as $var <<Data>>
!endprocedure
!procedure Storage($var, $name)
database "$name" as $var <<Storage>>
!endprocedure
!procedure ComputeUnit($var, $name)
node "$name" as $var <<ComputeUnit>>
!endprocedure
!procedure WorkflowEvent($var, $name)
usecase "$name" as $var <<WorkflowEvent>>
!endprocedure
!procedure Workflow($var, $name)
frame "$name" as $var <<Workflow>>
!endprocedure
`
// ToPlantUML serializes the workflow graph to a valid, renderable PlantUML file
// that is also compatible with ExtractFromPlantUML (round-trip).
// Resource and instance attributes are written as human-readable comments:
//
// Processing(p1, "NDVI") ' access.container.image: myrepo/ndvi:1.2, infrastructure: 0
func (w *Workflow) ToPlantUML() string {
var sb strings.Builder
sb.WriteString("@startuml\n\n")
sb.WriteString(plantUMLProcedures)
sb.WriteByte('\n')
varNames := plantUMLVarNames(w.Graph.Items)
// --- resource declarations ---
for id, item := range w.Graph.Items {
if line := plantUMLItemLine(varNames[id], item); line != "" {
sb.WriteString(line + "\n")
}
}
sb.WriteByte('\n')
// --- links ---
for _, link := range w.Graph.Links {
src := varNames[link.Source.ID]
dst := varNames[link.Destination.ID]
if src == "" || dst == "" {
continue
}
sb.WriteString(fmt.Sprintf("%s --> %s\n", src, dst))
if comment := plantUMLLinkComment(link); comment != "" {
sb.WriteString("' " + comment + "\n")
}
}
sb.WriteString("\n@enduml\n")
return sb.String()
}
// plantUMLVarNames assigns short, deterministic variable names to each graph
// item (d1, d2, p1, s1, c1, e1, wf1 …).
func plantUMLVarNames(items map[string]graph.GraphItem) map[string]string {
counters := map[string]int{}
varNames := map[string]string{}
// Sort IDs for deterministic output
ids := make([]string, 0, len(items))
for id := range items {
ids = append(ids, id)
}
sort.Strings(ids)
for _, id := range ids {
prefix := plantUMLPrefix(items[id])
counters[prefix]++
varNames[id] = fmt.Sprintf("%s%d", prefix, counters[prefix])
}
return varNames
}
func plantUMLPrefix(item graph.GraphItem) string {
switch {
case item.NativeTool != nil:
return "e"
case item.Data != nil:
return "d"
case item.Processing != nil:
return "p"
case item.Storage != nil:
return "s"
case item.Compute != nil:
return "c"
case item.Workflow != nil:
return "wf"
}
return "u"
}
// plantUMLItemLine builds the PlantUML declaration line for one graph item.
func plantUMLItemLine(varName string, item graph.GraphItem) string {
switch {
case item.NativeTool != nil:
// WorkflowEvent has no instance and no configurable attributes.
return fmt.Sprintf("WorkflowEvent(%s, \"%s\")", varName, item.NativeTool.GetName())
case item.Data != nil:
return plantUMLResourceLine("Data", varName, item.Data)
case item.Processing != nil:
return plantUMLResourceLine("Processing", varName, item.Processing)
case item.Storage != nil:
return plantUMLResourceLine("Storage", varName, item.Storage)
case item.Compute != nil:
return plantUMLResourceLine("ComputeUnit", varName, item.Compute)
case item.Workflow != nil:
return plantUMLResourceLine("Workflow", varName, item.Workflow)
}
return ""
}
func plantUMLResourceLine(macro, varName string, res resources.ResourceInterface) string {
decl := fmt.Sprintf("%s(%s, \"%s\")", macro, varName, res.GetName())
if comment := plantUMLResourceComment(res); comment != "" {
// Comment on the line AFTER the declaration. ExtractFromPlantUML uses
// look-ahead to merge it back. No inline comment = no !procedure conflict.
return decl + "\n' " + comment
}
return decl
}
// plantUMLResourceComment merges resource-level fields with the first instance
// fields (instance overrides resource) and formats them as human-readable pairs.
func plantUMLResourceComment(res resources.ResourceInterface) string {
m := plantUMLToFlatMap(res)
if inst := res.GetSelectedInstance(nil); inst != nil {
for k, v := range plantUMLToFlatMap(inst) {
m[k] = v
}
}
return plantUMLFlatMapToComment(m)
}
// plantUMLLinkComment serializes StorageLinkInfos (first entry) as flat
// human-readable pairs prefixed with "storage_link_infos.".
func plantUMLLinkComment(link graph.GraphLink) string {
if len(link.StorageLinkInfos) == 0 {
return ""
}
infoFlat := plantUMLToFlatMap(link.StorageLinkInfos[0])
prefixed := make(map[string]string, len(infoFlat))
for k, v := range infoFlat {
prefixed["storage_link_infos."+k] = v
}
return plantUMLFlatMapToComment(prefixed)
}
// ---------------------------------------------------------------------------
// Flat-map helpers (shared by import & export)
// ---------------------------------------------------------------------------
// plantUMLSkipFields lists JSON field names (root keys) that must never appear
// in human-readable comments. All names are the actual JSON tags, not Go field names.
var plantUMLSkipFields = map[string]bool{
// AbstractObject — identity & audit (json tags)
"id": true, "name": true, "is_draft": true, "access_mode": true, "signature": true,
"creator_id": true, "user_creator_id": true,
"creation_date": true, "update_date": true,
"updater_id": true, "user_updater_id": true,
// internal resource type identifier (AbstractResource.Type / GetType())
"type": true,
// relationships / pricing
"instances": true, "partnerships": true,
"allowed_booking_modes": true, "usage_restrictions": true,
// display / admin
"logo": true, "description": true, "short_description": true, "owners": true,
// runtime params
"env": true, "inputs": true, "outputs": true,
// NativeTool internals
"kind": true, "params": true,
}
// zeroTimeStr is the JSON representation of Go's zero time.Time value.
// encoding/json does not treat it as "empty" for omitempty, so we filter it explicitly.
const zeroTimeStr = "0001-01-01T00:00:00Z"
// plantUMLToFlatMap marshals v to JSON and flattens the resulting object into
// a map[string]string using dot notation for nested keys, skipping zero values
// and known meta fields.
func plantUMLToFlatMap(v interface{}) map[string]string {
b, err := json.Marshal(v)
if err != nil {
return nil
}
var raw map[string]interface{}
if err := json.Unmarshal(b, &raw); err != nil {
return nil
}
result := map[string]string{}
plantUMLFlattenJSON(raw, "", result)
return result
}
// plantUMLFlattenJSON recursively walks a JSON object and writes scalar leaf
// values into result using dot-notation keys.
func plantUMLFlattenJSON(m map[string]interface{}, prefix string, result map[string]string) {
for k, v := range m {
fullKey := k
if prefix != "" {
fullKey = prefix + "." + k
}
// Skip fields whose root key is in the deny-list
if plantUMLSkipFields[strings.SplitN(fullKey, ".", 2)[0]] {
continue
}
switch val := v.(type) {
case map[string]interface{}:
plantUMLFlattenJSON(val, fullKey, result)
case []interface{}:
// Arrays are not representable in flat human-readable format; skip.
case float64:
if val != 0 {
if val == float64(int64(val)) {
result[fullKey] = strconv.FormatInt(int64(val), 10)
} else {
result[fullKey] = strconv.FormatFloat(val, 'f', -1, 64)
}
}
case bool:
if val {
result[fullKey] = "true"
}
case string:
if val != "" && val != zeroTimeStr {
result[fullKey] = val
}
}
}
}
// plantUMLFlatMapToComment converts a flat map to a sorted "key: value, …" string.
func plantUMLFlatMapToComment(m map[string]string) string {
if len(m) == 0 {
return ""
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, k := range keys {
parts = append(parts, k+": "+m[k])
}
return strings.Join(parts, ", ")
}

View File

@@ -6,16 +6,20 @@ import (
"errors"
"fmt"
"mime/multipart"
"strconv"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -137,41 +141,82 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
},
}
},
// WorkflowEvent creates a NativeTool of Kind=WORKFLOW_EVENT directly,
// without DB lookup. It has no user-defined instance.
"WorkflowEvent": func() resources.ResourceInterface {
return &resources.NativeTool{
Kind: int(native_tools.WORKFLOW_EVENT),
}
},
}
graphVarName := map[string]*graph.GraphItem{}
scanner := bufio.NewScanner(plantUML)
graphVarName := map[string]graph.GraphItem{}
// Collect all lines first to support look-ahead (comment on the line after
// the declaration, as produced by ToPlantUML).
scanner := bufio.NewScanner(plantUML)
var lines []string
for scanner.Scan() {
line := scanner.Text()
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
return d, err
}
for i, line := range lines {
trimmed := strings.TrimSpace(line)
// Skip pure comment lines and PlantUML directives — they must never be
// parsed as resource declarations or links. Without this guard, a comment
// like "' source: http://my-server.com" would match the "-" link check.
if strings.HasPrefix(trimmed, "'") ||
strings.HasPrefix(trimmed, "!") ||
strings.HasPrefix(trimmed, "@") ||
trimmed == "" {
continue
}
// Build the parse line: if the current line has no inline comment and the
// next line is a pure comment, append it so parsers receive one combined line.
// Also handles the legacy inline-comment format unchanged.
parseLine := line
if !strings.Contains(line, "'") && i+1 < len(lines) {
if next := strings.TrimSpace(lines[i+1]); strings.HasPrefix(next, "'") {
parseLine = line + " " + next
}
}
for n, new := range resourceCatalog {
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") { // should exclude declaration of type.
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") { // exclude macro declarations
newRes := new()
varName, graphItem, err := d.extractResourcePlantUML(line, newRes, n, request.PeerID)
newRes.SetID(uuid.New().String())
varName, graphItem, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID)
if err != nil {
return d, err
}
graphVarName[varName] = graphItem
if graphItem != nil {
graphVarName[varName] = *graphItem
}
continue
} else if strings.Contains(line, n+"-->") {
err := d.extractLink(line, graphVarName, "-->", false)
} else if strings.Contains(line, "-->") {
err := d.extractLink(parseLine, graphVarName, "-->", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"<--") {
err := d.extractLink(line, graphVarName, "<--", true)
} else if strings.Contains(line, "<--") {
err := d.extractLink(parseLine, graphVarName, "<--", true)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"--") {
err := d.extractLink(line, graphVarName, "--", false)
} else if strings.Contains(line, "--") {
err := d.extractLink(parseLine, graphVarName, "--", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"-") {
err := d.extractLink(line, graphVarName, "-", false)
} else if strings.Contains(line, "-") {
err := d.extractLink(parseLine, graphVarName, "-", false)
if err != nil {
fmt.Println(err)
continue
@@ -179,39 +224,104 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
}
}
}
if err := scanner.Err(); err != nil {
return d, err
}
d.generateResource(d.GetResources(tools.DATA_RESOURCE), request)
d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request)
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
d.Graph.Items = graphVarName
return d, nil
}
func (d *Workflow) generateResource(datas []resources.ResourceInterface, request *tools.APIRequest) error {
for _, d := range datas {
access := d.GetAccessor(request)
if _, code, err := access.LoadOne(d.GetID()); err != nil && code == 200 {
if d.GetType() == tools.COMPUTE_RESOURCE.String() {
access := live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, request)
if b, err := json.Marshal(d); err == nil {
var liv live.LiveDatacenter
json.Unmarshal(b, &liv)
data, _, err := access.StoreOne(&liv)
if err == nil {
access.CopyOne(data)
}
}
continue
} else if d.GetType() == tools.STORAGE_RESOURCE.String() {
access := live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, request)
if b, err := json.Marshal(d); err == nil {
var liv live.LiveStorage
json.Unmarshal(b, &liv)
data, _, err := access.StoreOne(&liv)
if err == nil {
access.CopyOne(data)
}
}
continue
}
access.StoreOne(d)
d.GetAccessor(request).StoreOne(d)
}
return nil
}
func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.GraphItem, pattern string, reverse bool) error {
// setNestedKey sets a value in a nested map using dot-notation path.
// "access.container.image" → m["access"]["container"]["image"] = value
func setNestedKey(m map[string]any, path string, value any) {
parts := strings.SplitN(path, ".", 2)
if len(parts) == 1 {
m[path] = value
return
}
key, rest := parts[0], parts[1]
if _, ok := m[key]; !ok {
m[key] = map[string]any{}
}
if sub, ok := m[key].(map[string]any); ok {
setNestedKey(sub, rest, value)
}
}
// parseHumanFriendlyAttrs converts a human-friendly comment into JSON bytes.
// Supports:
// - flat: "source: http://example.com, encryption: true, size: 500"
// - nested: "access.container.image: nginx, access.container.tag: latest"
// - raw JSON passthrough (backward-compat): '{"key": "value"}'
//
// Values are auto-typed: bool, float64, or string.
// Note: the first ':' in each pair is the key/value separator,
// so URLs like "http://..." are handled correctly.
func parseHumanFriendlyAttrs(comment string) []byte {
comment = strings.TrimSpace(comment)
if strings.HasPrefix(comment, "{") {
return []byte(comment)
}
m := map[string]any{}
for _, pair := range strings.Split(comment, ",") {
pair = strings.TrimSpace(pair)
parts := strings.SplitN(pair, ":", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
var typed any
if b, err := strconv.ParseBool(val); err == nil {
typed = b
} else if n, err := strconv.ParseFloat(val, 64); err == nil {
typed = n
} else {
typed = val
}
setNestedKey(m, key, typed)
}
b, _ := json.Marshal(m)
return b
}
func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphItem, pattern string, reverse bool) error {
splitted := strings.Split(line, pattern)
if len(splitted) < 2 {
return errors.New("links elements not found")
}
if graphVarName[splitted[0]] != nil {
return errors.New("links elements not found -> " + strings.Trim(splitted[0], " "))
}
if graphVarName[splitted[1]] != nil {
return errors.New("links elements not found -> " + strings.Trim(splitted[1], " "))
}
link := &graph.GraphLink{
Source: graph.Position{
ID: graphVarName[splitted[0]].ID,
@@ -230,11 +340,10 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.Graph
link.Source = tmp
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) <= 1 {
return errors.New("Can't deserialize Object, there's no commentary")
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
json.Unmarshal(parseHumanFriendlyAttrs(comment), link)
}
comment := strings.ReplaceAll(splittedComments[1], "'", "") // for now it's a json.
json.Unmarshal([]byte(comment), link)
d.Graph.Links = append(d.Graph.Links, *link)
return nil
}
@@ -245,7 +354,7 @@ func (d *Workflow) extractResourcePlantUML(line string, resource resources.Resou
return "", nil, errors.New("Can't deserialize Object, there's no func")
}
splittedParams := strings.Split(splittedFunc[1], ",")
if len(splittedFunc) <= 1 {
if len(splittedParams) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no params")
}
@@ -255,32 +364,40 @@ func (d *Workflow) extractResourcePlantUML(line string, resource resources.Resou
if len(splitted) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no name")
}
resource.SetName(splitted[1])
resource.SetName(strings.ReplaceAll(splitted[1], "\\n", " "))
splittedComments := strings.Split(line, "'")
if len(splittedComments) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no commentary")
}
comment := strings.ReplaceAll(splittedComments[1], "'", "") // for now it's a json.
// Resources with instances get a default one seeded from the parent resource,
// then overridden by any explicit comment attributes.
// Event (NativeTool) has no instance: getNewInstance returns nil and is skipped.
instance := d.getNewInstance(dataName, splitted[1], peerID)
if instance == nil {
return "", nil, errors.New("No instance found.")
if instance != nil {
if b, err := json.Marshal(resource); err == nil {
json.Unmarshal(b, instance)
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
json.Unmarshal(parseHumanFriendlyAttrs(comment), instance)
}
resource.AddInstances(instance)
}
resource.AddInstances(instance)
json.Unmarshal([]byte(comment), instance)
// deserializer les instances... une instance doit par défaut avoir certaines valeurs d'accès.
graphID := uuid.New()
graphItem := &graph.GraphItem{
ID: graphID.String(),
item := d.getNewGraphItem(dataName, resource)
if item != nil {
d.Graph.Items[item.ID] = *item
}
graphItem = d.getNewGraphItem(dataName, graphItem, resource)
d.Graph.Items[graphID.String()] = *graphItem
return varName, graphItem, nil
return varName, item, nil
}
func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem, resource resources.ResourceInterface) *graph.GraphItem {
func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
if resource == nil {
return nil
}
graphItem := &graph.GraphItem{
ID: uuid.New().String(),
ItemResource: &resources.ItemResource{},
}
switch dataName {
case "Data":
d.Datas = append(d.Datas, resource.GetID())
@@ -290,15 +407,13 @@ func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem,
d.Processings = append(d.Processings, resource.GetID())
d.ProcessingResources = append(d.ProcessingResources, resource.(*resources.ProcessingResource))
graphItem.Processing = resource.(*resources.ProcessingResource)
case "Event":
access := resources.NewAccessor[*resources.NativeTool](tools.NATIVE_TOOL, &tools.APIRequest{
Admin: true,
}, func() utils.DBObject { return &resources.NativeTool{} })
t, _, err := access.Search(nil, "WORKFLOW_EVENT", false)
if err == nil && len(t) > 0 {
d.NativeTool = append(d.NativeTool, t[0].GetID())
graphItem.NativeTool = t[0].(*resources.NativeTool)
}
case "WorkflowEvent":
// The resource is already a *NativeTool with Kind=WORKFLOW_EVENT set by the
// catalog factory. We use it directly without any DB lookup.
nt := resource.(*resources.NativeTool)
nt.Name = native_tools.WORKFLOW_EVENT.String()
d.NativeTool = append(d.NativeTool, nt.GetID())
graphItem.NativeTool = nt
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
d.StorageResources = append(d.StorageResources, resource.(*resources.StorageResource))
@@ -308,7 +423,7 @@ func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem,
d.ComputeResources = append(d.ComputeResources, resource.(*resources.ComputeResource))
graphItem.Compute = resource.(*resources.ComputeResource)
default:
return graphItem
return nil
}
return graphItem
}
@@ -480,8 +595,36 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
return true, nil
}
func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
// preemptDelay is the minimum lead time granted before a preempted booking starts.
const preemptDelay = 30 * time.Second
// Planify computes the scheduled start/end for every resource in the workflow.
//
// bookingMode controls availability checking when p (a live planner snapshot) is provided:
// - PREEMPTED : start from now+preemptDelay regardless of existing load.
// - WHEN_POSSIBLE: start from max(now, start); if a slot conflicts, slide to the next free window.
// - PLANNED : use start as-is; return an error if the slot is not available.
//
// Passing p = nil skips all availability checks (useful for sub-workflow recursion).
func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, p planner.PlannerITF, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
// 1. Adjust global start based on booking mode.
now := time.Now()
switch booking.BookingMode(bookingMode) {
case booking.PREEMPTED:
if earliest := now.Add(preemptDelay); start.Before(earliest) {
start = earliest
}
case booking.WHEN_POSSIBLE:
if start.Before(now) {
start = now
}
// PLANNED: honour the caller's start date as-is.
}
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
var err error
// 2. Plan processings first so we can derive the total workflow duration.
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
@@ -492,12 +635,17 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
})
if err != nil {
return false, 0, priceds, nil, err
}
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
workflowDuration := common.GetPlannerLongestTime(priceds)
if _, priceds, err = plan[resources.ResourceInterface](tools.NATIVE_TOOL, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
wf.Graph.IsNativeTool, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
return start, 0, nil
@@ -514,11 +662,13 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}); err != nil {
return false, 0, priceds, nil, err
}
// 3. Compute/storage: duration = total workflow duration (conservative bound).
for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage,
tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
if _, priceds, err = plan[resources.ResourceInterface](k, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
nearestStart, longestDuration, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
nearestStart, _, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
if f(i) {
_, r = i.GetResource()
}
@@ -526,27 +676,31 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}, *instances.Get(res.GetID()), *partnerships.Get(res.GetID()),
*buyings.Get(res.GetID()), *strategies.Get(res.GetID()), bookingMode, request)
if err != nil {
return start, longestDuration, err
return start, workflowDuration, err
}
return start.Add(time.Duration(nearestStart) * time.Second), longestDuration, nil
return start.Add(time.Duration(nearestStart) * time.Second), workflowDuration, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
if duration < 0 {
return nil, nil // service: open-ended booking
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
}
longest := common.GetPlannerLongestTime(end, priceds, request)
longest := workflowDuration
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, instances, partnerships, buyings, strategies,
bookingMode, wf, priceds, request, wf.Graph.IsWorkflow,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second)
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds)) * time.Second)
longest := float64(-1)
r, code, err := res.GetAccessor(request).LoadOne(res.GetID())
if code != 200 || err != nil {
return start, longest, err
}
_, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, request)
_, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, nil, request)
// should ... import priced
if err != nil {
return start, longest, err
@@ -564,13 +718,26 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}
}
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second), longest, nil
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds)) * time.Second), longest, nil
}, func(start time.Time, longest float64) (*time.Time, error) {
s := start.Add(time.Duration(longest) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// 4. Availability check against the live planner (skipped for PREEMPTED and sub-workflows).
if p != nil && booking.BookingMode(bookingMode) != booking.PREEMPTED {
slide, err := plannerAvailabilitySlide(p, priceds, booking.BookingMode(bookingMode))
if err != nil {
return false, 0, priceds, nil, err
}
if slide > 0 {
// Re-plan from the corrected start; pass nil planner to avoid infinite recursion.
return wf.Planify(start.Add(slide), end, instances, partnerships, buyings, strategies, bookingMode, nil, request)
}
}
isPreemptible := true
for _, first := range wf.GetFirstItems() {
_, res := first.GetResource()
@@ -582,6 +749,36 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
return isPreemptible, longest, priceds, wf, nil
}
// plannerAvailabilitySlide checks all compute/storage resources in priceds against the planner.
// For PLANNED mode it returns an error immediately on the first conflict.
// For WHEN_POSSIBLE it returns the maximum slide (duration to add to global start) needed to
// clear all conflicts, or 0 if the plan is already conflict-free.
func plannerAvailabilitySlide(p planner.PlannerITF, priceds map[tools.DataType]map[string]pricing.PricedItemITF, mode booking.BookingMode) (time.Duration, error) {
maxSlide := time.Duration(0)
for _, dt := range []tools.DataType{tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE} {
for _, priced := range priceds[dt] {
locStart := priced.GetLocationStart()
locEnd := priced.GetLocationEnd()
if locStart == nil || locEnd == nil {
continue // open-ended: skip availability check
}
d := locEnd.Sub(*locStart)
next := p.NextAvailableStart(priced.GetID(), priced.GetInstanceID(), *locStart, d)
slide := next.Sub(*locStart)
if slide <= 0 {
continue
}
if mode == booking.PLANNED {
return 0, errors.New("requested slot is not available for resource " + priced.GetID())
}
if slide > maxSlide {
maxSlide = slide
}
}
}
return maxSlide, nil
}
// Returns a map of DataType (processing,computing,data,storage,worfklow) where each resource (identified by its UUID)
// is mapped to the list of its items (different appearance) in the graph
// ex: if the same Minio storage is represented by several nodes in the graph, in [tools.STORAGE_RESSOURCE] its UUID will be mapped to
@@ -642,9 +839,6 @@ func plan[T resources.ResourceInterface](
priced.SetLocationEnd(*e)
}
}
if e, err := end(started, priced.GetExplicitDurationInS()); err != nil && e != nil {
priced.SetLocationEnd(*e)
}
resources = append(resources, realItem.(T))
if priceds[dt][item.ID] != nil {
priced.AddQuantity(priceds[dt][item.ID].GetQuantity())

View File

@@ -2,7 +2,6 @@ package workflow_execution
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -66,7 +65,7 @@ func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
if ws.EndDate == nil {
// if no end... then Book like a savage
e := ws.ExecDate.Add(time.Hour)
e := ws.ExecDate.UTC().Add(time.Hour)
ws.EndDate = &e
}
accessor := ws.GetAccessor(request)
@@ -133,7 +132,6 @@ func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string,
purchases = append(purchases, d.buyEach(bs, executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
d.PurchasesState = map[string]bool{}
for _, p := range purchases {
p.SetID(uuid.NewString())
d.PurchasesState[p.GetID()] = false
}
return purchases
@@ -142,7 +140,6 @@ func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string,
func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*purchase_resource.PurchaseResource {
items := []*purchase_resource.PurchaseResource{}
for itemID, priced := range priceds {
fmt.Println(priced.IsPurchasable(), bs)
if !priced.IsPurchasable() || bs != pricing.BILL_ONCE { // buy only that must be buy
continue
}
@@ -156,7 +153,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
d.PeerBuyByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate
if s := priced.GetLocationStart(); s != nil {
if s := priced.GetLocationStart(); s != nil && s.After(time.Now()) {
start = *s
}
var m map[string]interface{}
@@ -191,7 +188,6 @@ func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[t
booking = append(booking, d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
booking = append(booking, d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
for _, p := range booking {
p.SetID(uuid.NewString())
if d.BookingsState == nil {
d.BookingsState = map[string]bool{}
}
@@ -216,10 +212,20 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate
if s := priced.GetLocationStart(); s != nil {
if s := priced.GetLocationStart(); s != nil && s.After(time.Now()) {
start = *s
}
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
// Prefer LocationEnd set by Planify; fall back to ExplicitDurationInS only
// when Planify did not compute an end (open-ended / service resources).
var endDate *time.Time
if locEnd := priced.GetLocationEnd(); locEnd != nil {
endDate = locEnd
} else if durationS := priced.GetExplicitDurationInS(); durationS > 0 {
e := start.Add(time.Duration(durationS) * time.Second)
endDate = &e
}
// durationS < 0 means the resource is a service (runs indefinitely):
// leave ExpectedEndDate nil so the booking is open-ended.
var m map[string]interface{}
b, _ := json.Marshal(priced)
json.Unmarshal(b, &m)
@@ -239,7 +245,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
WorkflowID: wfID,
ExecutionID: d.GetID(),
ExpectedStartDate: start,
ExpectedEndDate: &end,
ExpectedEndDate: endDate,
}
items = append(items, bookingItem)
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(

View File

@@ -24,7 +24,7 @@ func newShallowAccessor(request *tools.APIRequest) *WorkflowExecutionMongoAccess
Request: request,
Type: tools.WORKFLOW_EXECUTION,
New: func() *WorkflowExecution { return &WorkflowExecution{} },
NotImplemented: []string{"DeleteOne", "StoreOne", "CopyOne"},
NotImplemented: []string{"CopyOne"},
},
}
}
@@ -37,7 +37,7 @@ func NewAccessor(request *tools.APIRequest) *WorkflowExecutionMongoAccessor {
Request: request,
Type: tools.WORKFLOW_EXECUTION,
New: func() *WorkflowExecution { return &WorkflowExecution{} },
NotImplemented: []string{"DeleteOne", "StoreOne", "CopyOne"},
NotImplemented: []string{"CopyOne"},
},
}
}
@@ -52,7 +52,7 @@ func (wfa *WorkflowExecutionMongoAccessor) UpdateOne(set map[string]interface{},
}
func (a *WorkflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*WorkflowExecution](id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
now := time.Now()
now = now.Add(time.Second * -60)
if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {

View File

@@ -13,7 +13,6 @@ import (
func LoadKeyFromFilePrivate() (crypto.PrivKey, error) {
path := config.GetConfig().PrivateKeyPath
fmt.Println(path)
data, err := os.ReadFile(path)
if err != nil {
return nil, err

View File

@@ -35,12 +35,16 @@ type KubernetesService struct {
}
func NewDynamicClient(host string, ca string, cert string, data string) (*dynamic.DynamicClient, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(data)
config := &rest.Config{
Host: host,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(ca),
CertData: []byte(cert),
KeyData: []byte(data),
CAData: []byte(decodedCa),
CertData: []byte(decodedCert),
KeyData: []byte(decodedKey),
},
}
@@ -56,18 +60,21 @@ func NewDynamicClient(host string, ca string, cert string, data string) (*dynami
}
func NewKubernetesService(host string, ca string, cert string, data string) (*KubernetesService, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(data)
config := &rest.Config{
Host: host,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(ca),
CertData: []byte(cert),
KeyData: []byte(data),
CAData: []byte(decodedCa),
CertData: []byte(decodedCert),
KeyData: []byte(decodedKey),
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
@@ -84,38 +91,6 @@ func NewKubernetesService(host string, ca string, cert string, data string) (*Ku
}, nil
}
func NewRemoteKubernetesService(url string, ca string, cert string, key string) (*KubernetesService, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(key)
config := &rest.Config{
Host: url + ":6443",
TLSClientConfig: rest.TLSClientConfig{
CAData: decodedCa,
CertData: decodedCert,
KeyData: decodedKey,
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
if clientset == nil {
return nil, errors.New("Error creating Kubernetes client: clientset is nil")
}
return &KubernetesService{
Set: clientset,
Host: url,
CA: string(decodedCa),
Cert: string(decodedCert),
Data: string(decodedKey),
}, nil
}
func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) error {
// Define the namespace
fmt.Println("ExecutionID in CreateNamespace() : ", ns)
@@ -128,7 +103,7 @@ func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) erro
},
}
// Create the namespace
fmt.Println("Creating namespace...", k.Set)
fmt.Println("Creating namespace...")
if _, err := k.Set.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{}); err != nil {
return errors.New("Error creating namespace: " + err.Error())
}
@@ -208,6 +183,40 @@ func (k *KubernetesService) CreateRoleBinding(ctx context.Context, ns string, ro
return nil
}
// ProvisionExecutionNamespace creates the full Argo execution environment for a
// namespace: namespace, service-account, role and role-binding. Idempotent — if
// the namespace already exists the call is a no-op.
func (k *KubernetesService) ProvisionExecutionNamespace(ctx context.Context, ns string) error {
existing, _ := k.GetNamespace(ctx, ns)
if existing != nil {
return nil
}
if err := k.CreateNamespace(ctx, ns); err != nil && !strings.Contains(err.Error(), "already exists") {
return err
}
if err := k.CreateServiceAccount(ctx, ns); err != nil && !strings.Contains(err.Error(), "already exists") {
return err
}
role := "argo-role"
if err := k.CreateRole(ctx, ns, role,
[][]string{{"coordination.k8s.io"}, {""}, {""}},
[][]string{{"leases"}, {"secrets"}, {"pods"}},
[][]string{{"get", "create", "update"}, {"get"}, {"patch"}},
); err != nil {
return err
}
return k.CreateRoleBinding(ctx, ns, "argo-role-binding", role)
}
// TeardownExecutionNamespace deletes the namespace and lets Kubernetes cascade
// the deletion of all contained resources (SA, Role, RoleBinding, pods…).
func (k *KubernetesService) TeardownExecutionNamespace(ctx context.Context, ns string) error {
if err := k.Set.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{}); err != nil {
return errors.New("error deleting namespace " + ns + ": " + err.Error())
}
return nil
}
func (k *KubernetesService) DeleteNamespace(ctx context.Context, ns string, f func()) error {
targetGVR := schema.GroupVersionResource{
Group: "multicluster.admiralty.io",
@@ -270,17 +279,12 @@ func (k *KubernetesService) GetTargets(ctx context.Context) ([]string, error) {
return nil, err
}
fmt.Println(string(resp))
var targetDict map[string]interface{}
err = json.Unmarshal(resp, &targetDict)
if err != nil {
fmt.Println("TODO: handle the error when unmarshalling k8s API response")
return nil, err
}
b, _ := json.MarshalIndent(targetDict, "", " ")
fmt.Println(string(b))
data := targetDict["items"].([]interface{})
for _, item := range data {
@@ -390,7 +394,6 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
// config, err := base64.RawStdEncoding.DecodeString(kubeconfig)
if err != nil {
fmt.Println("Error while encoding kubeconfig")
fmt.Println(err)
return nil, err
}
@@ -400,21 +403,6 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
"config": config,
},
)
// exists, err := k.GetKubeconfigSecret(context,executionId)
// if err != nil {
// fmt.Println("Error verifying if kube secret exists in namespace ", executionId)
// return nil, err
// }
// if exists != nil {
// fmt.Println("kube-secret already exists in namespace", executionId)
// fmt.Println("Overriding existing kube-secret with a newer resource")
// // TODO : implement DeleteKubeConfigSecret(executionID)
// deleted, err := k.DeleteKubeConfigSecret(executionId)
// _ = deleted
// _ = err
// }
resp, err := k.Set.CoreV1().
Secrets(executionId).
Apply(context,
@@ -425,14 +413,12 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
if err != nil {
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err)
return nil, err
}
data, err := json.Marshal(resp)
if err != nil {
fmt.Println("Couldn't marshal resp from : ", data)
fmt.Println(err)
return nil, err
}
return data, nil
@@ -449,7 +435,6 @@ func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executi
return nil, nil
}
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err)
return nil, err
}
@@ -457,7 +442,6 @@ func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executi
if err != nil {
fmt.Println("Couldn't marshal resp from : ", data)
fmt.Println(err)
return nil, err
}
@@ -512,15 +496,14 @@ func dynamicClientApply(host string, ca string, cert string, data string, execut
},
)
if err != nil {
o, err := json.Marshal(object)
fmt.Println("Error from k8s API when applying "+fmt.Sprint(string(o))+" to "+gvrSources.String()+" : ", err)
fmt.Println("Error from k8s API when applying "+fmt.Sprintf("%v", object)+" to "+gvrSources.String()+" : ", err)
return nil, err
}
// We can add more info to the log with the content of resp if not nil
resByte, err := json.Marshal(res)
if err != nil {
// fmt.Println("Error trying to create a Source on remote cluster : ", err , " : ", res)
fmt.Println("Error trying to create a Source on remote cluster : ", err, " : ", res)
return nil, err
}
@@ -578,7 +561,6 @@ func (k *KubernetesService) GetOneNode(context context.Context, executionID stri
)
if err != nil {
fmt.Println("Error getting the list of nodes from k8s API")
fmt.Println(err)
return nil, err
}

View File

@@ -28,7 +28,9 @@ type NATSMethod int
var meths = []string{"remove execution", "create execution", "planner execution", "discovery",
"workflow event", "argo kube event", "create resource", "remove resource",
"propalgation event", "search event",
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event",
"workflow started event", "workflow step done event", "workflow done event",
}
const (
@@ -45,6 +47,19 @@ const (
PROPALGATION_EVENT
SEARCH_EVENT
CONFIRM_EVENT
CONSIDERS_EVENT
ADMIRALTY_CONFIG_EVENT
MINIO_CONFIG_EVENT
// Workflow lifecycle events emitted by oc-monitord.
// oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state.
// oc-datacenter listens to STEP_DONE and DONE to close bookings and tear down infra.
WORKFLOW_STARTED_EVENT
WORKFLOW_STEP_DONE_EVENT
WORKFLOW_DONE_EVENT
)
func (n NATSMethod) String() string {
@@ -54,7 +69,9 @@ func (n NATSMethod) String() string {
// NameToMethod returns the NATSMethod enum value from a string
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT} {
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}

View File

@@ -0,0 +1,33 @@
package tools
import "time"
// StepMetric carries the outcome of one Argo step node as observed by oc-monitord.
// Embedded in WorkflowLifecycleEvent.Steps for the WORKFLOW_DONE_EVENT recap.
type StepMetric struct {
BookingID string `json:"booking_id"`
State int `json:"state"`
RealStart *time.Time `json:"real_start,omitempty"`
RealEnd *time.Time `json:"real_end,omitempty"`
}
// WorkflowLifecycleEvent is the NATS payload emitted by oc-monitord on
// WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, and WORKFLOW_DONE_EVENT.
//
// - ExecutionID : WorkflowExecution UUID (used by oc-scheduler to update state)
// - ExecutionsID : run-group ID shared by all bookings of the same run
// - BookingID : non-empty only for WORKFLOW_STEP_DONE_EVENT
// - State : target state (enum index: SUCCESS=3, FAILURE=4, STARTED=2, …)
// - RealStart : actual start timestamp recorded by Argo (nil if unknown)
// - RealEnd : actual end timestamp recorded by Argo (nil for STARTED events)
// - Steps : non-nil only for WORKFLOW_DONE_EVENT — full recap of every step
// so oc-scheduler and oc-catalog can catch up if they missed STEP_DONE events
type WorkflowLifecycleEvent struct {
ExecutionID string `json:"execution_id"`
ExecutionsID string `json:"executions_id"`
BookingID string `json:"booking_id,omitempty"`
State int `json:"state"`
RealStart *time.Time `json:"real_start,omitempty"`
RealEnd *time.Time `json:"real_end,omitempty"`
Steps []StepMetric `json:"steps,omitempty"`
}