2 Commits

Author SHA1 Message Date
mr
c87245e83f Oc-Datacenter Allowed Resource And Prepull Images For Efficient process 2026-03-25 11:11:03 +01:00
mr
dab61463f0 WatchDog Kube 2026-03-24 10:50:36 +01:00
21 changed files with 1660 additions and 493 deletions

View File

@@ -6,13 +6,17 @@ type Config struct {
Mode string Mode string
KubeHost string KubeHost string
KubePort string KubePort string
KubeCA string // KubeExternalHost is the externally reachable address of this cluster's API server.
KubeCert string // Used when generating kubeconfigs for remote peers. Must be an IP or hostname
KubeData string // reachable from outside the cluster (NOT kubernetes.default.svc.cluster.local).
MinioRootKey string KubeExternalHost string
MinioRootSecret string KubeCA string
MonitorMode string KubeCert string
MonitorAddress string KubeData string
MinioRootKey string
MinioRootSecret string
MonitorMode string
MonitorAddress string
} }
var instance *Config var instance *Config

View File

@@ -0,0 +1,96 @@
package controllers
import (
"encoding/json"
"slices"
oclib "cloud.o-forge.io/core/oc-lib"
beego "github.com/beego/beego/v2/server/web"
"cloud.o-forge.io/core/oc-lib/models/allowed_image"
)
// AllowedImageController gère la liste locale des images autorisées à persister
// sur ce peer après l'exécution d'un workflow.
//
// GET /allowed-image/ → tous les utilisateurs authentifiés
// GET /allowed-image/:id → tous les utilisateurs authentifiés
// POST /allowed-image/ → peer admin uniquement
// DELETE /allowed-image/:id → peer admin uniquement (bloqué si IsDefault)
type AllowedImageController struct {
beego.Controller
}
// isAdmin vérifie que l'appelant est peer admin (groupe "admin" dans le token JWT).
func isAdmin(groups []string) bool {
return slices.Contains(groups, "admin")
}
// @Title GetAll
// @Description Retourne toutes les images autorisées à persister sur ce peer
// @Success 200 {object} []allowed_image.AllowedImage
// @router / [get]
func (o *AllowedImageController) GetAll() {
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).LoadAll(false)
o.Data["json"] = res
o.ServeJSON()
}
// @Title Get
// @Description Retourne une image autorisée par son ID
// @Param id path string true "ID de l'image autorisée"
// @Success 200 {object} allowed_image.AllowedImage
// @router /:id [get]
func (o *AllowedImageController) Get() {
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
id := o.Ctx.Input.Param(":id")
res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).LoadOne(id)
o.Data["json"] = res
o.ServeJSON()
}
// @Title Post
// @Description Ajoute une image à la liste des images autorisées (peer admin uniquement)
// @Param body body allowed_image.AllowedImage true "Image à autoriser"
// @Success 200 {object} allowed_image.AllowedImage
// @router / [post]
func (o *AllowedImageController) Post() {
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
if !isAdmin(groups) {
o.Ctx.Output.SetStatus(403)
o.Data["json"] = map[string]string{"err": "peer admin required"}
o.ServeJSON()
return
}
var img allowed_image.AllowedImage
if err := json.Unmarshal(o.Ctx.Input.RequestBody, &img); err != nil {
o.Ctx.Output.SetStatus(400)
o.Data["json"] = map[string]string{"err": err.Error()}
o.ServeJSON()
return
}
img.IsDefault = false // l'opérateur ne peut pas créer d'entrées bootstrap via API
res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).StoreOne(img.Serialize(&img))
o.Data["json"] = res
o.ServeJSON()
}
// @Title Delete
// @Description Supprime une image de la liste des images autorisées (peer admin uniquement, entrées bootstrap non supprimables)
// @Param id path string true "ID de l'image autorisée"
// @Success 200 {object} allowed_image.AllowedImage
// @router /:id [delete]
func (o *AllowedImageController) Delete() {
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
if !isAdmin(groups) {
o.Ctx.Output.SetStatus(403)
o.Data["json"] = map[string]string{"err": "peer admin required"}
o.ServeJSON()
return
}
id := o.Ctx.Input.Param(":id")
res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).DeleteOne(id)
o.Data["json"] = res
o.ServeJSON()
}

View File

@@ -1,21 +0,0 @@
package controllers
import (
"fmt"
beego "github.com/beego/beego/v2/server/web"
)
func HandleControllerErrors(c beego.Controller, code int, err *error, data *map[string]interface{}, messages ...string) {
for _, mess := range messages {
fmt.Println(mess)
}
if data != nil {
c.Data["json"] = data
}
if err != nil {
c.Data["json"] = map[string]string{"error": (*err).Error()}
}
c.Ctx.Output.SetStatus(code)
c.ServeJSON()
}

View File

@@ -1,7 +1,6 @@
package controllers package controllers
import ( import (
"fmt"
"oc-datacenter/conf" "oc-datacenter/conf"
"strconv" "strconv"
@@ -41,7 +40,6 @@ func (o *SessionController) GetToken() {
o.ServeJSON() o.ServeJSON()
return return
} }
fmt.Println("BLAPO", id, duration)
token, err := serv.GenerateToken(o.Ctx.Request.Context(), id, duration) token, err := serv.GenerateToken(o.Ctx.Request.Context(), id, duration)
if err != nil { if err != nil {
// change code to 500 // change code to 500

View File

@@ -1,5 +1,11 @@
{ {
"MONGO_URL":"mongodb://mongo:27017/", "MONGO_URL":"mongodb://mongo:27017/",
"NATS_URL":"nats://nats:4222", "NATS_URL":"nats://nats:4222",
"MONGO_DATABASE":"DC_myDC" "MONGO_DATABASE":"DC_myDC",
"KUBERNETES_SERVICE_HOST": "kubernetes.default.svc.cluster.local",
"KUBERNETES_SERVICE_PORT": "6443",
"KUBE_EXTERNAL_HOST": "",
"KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpReU56STVNVEF3SGhjTk1qWXdNekl6TVRNek5URXdXaGNOTXpZd016SXdNVE16TlRFdwpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpReU56STVNVEF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFSSGpYRDVpbnRIYWZWSk5VaDFlRnIxcXBKdFlkUmc5NStKVENEa0tadTIKYjUxRXlKaG1zanRIY3BDUndGL1VGMzlvdzY4TFBUcjBxaUorUHlhQTBLZUtvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTdWQkNzZVN3ajJ2cmczMFE5UG8vCnV6ZzAvMjR3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUlEOVY2aFlUSS83ZW1hRzU0dDdDWVU3TXFSdDdESUkKNlgvSUwrQ0RLbzlNQWlCdlFEMGJmT0tVWDc4UmRGdUplcEhEdWFUMUExaGkxcWdIUGduM1dZdDBxUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJUU5KbFNJQUJPMDR3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOemMwTWpjeU9URXdNQjRYRFRJMk1ETXlNekV6TXpVeE1Gb1hEVEkzTURNeQpNekV6TXpVeE1Gb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJMY3Uwb2pUbVg4RFhTQkYKSHZwZDZNVEoyTHdXc1lRTmdZVURXRDhTVERIUWlCczlMZ0x5ZTdOMEFvZk85RkNZVW1HamhiaVd3WFVHR3dGTgpUdlRMU2lXalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUlJhRW9wQzc5NGJyTHlnR0g5SVhvbDZTSmlFREFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQWhaRUlrSWV3Y1loL1NmTFVCVjE5MW1CYTNRK0J5S2J5eTVlQmpwL3kzeWtDSUIxWTJicTVOZTNLUUU4RAprNnNzeFJrbjJmN0VoWWVRQU1pUlJ2MjIweDNLCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTnpReU56STVNVEF3SGhjTk1qWXdNekl6TVRNek5URXdXaGNOTXpZd016SXdNVE16TlRFdwpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTnpReU56STVNVEF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTcTdVTC85MEc1ZmVTaE95NjI3eGFZWlM5dHhFdWFoWFQ3Vk5wZkpQSnMKaEdXd2UxOXdtbXZzdlp6dlNPUWFRSzJaMmttN0hSb1IrNlA1YjIyamczbHVvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVVXaEtLUXUvZUc2eThvQmgvU0Y2Ckpla2lZaEF3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUk3cGxHczFtV20ySDErbjRobDBNTk13RmZzd0o5ZXIKTzRGVkM0QzhwRG44QWlCN3NZMVFwd2M5VkRUeGNZaGxuZzZNUzRXai85K0lHWjJxcy94UStrMjdTQT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUROZDRnWXd6aVRhK1hwNnFtNVc3SHFzc1JJNkREaUJTbUV2ZHoxZzk3VGxvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFdHk3U2lOT1pmd05kSUVVZStsM294TW5ZdkJheGhBMkJoUU5ZUHhKTU1kQ0lHejB1QXZKNwpzM1FDaDg3MFVKaFNZYU9GdUpiQmRRWWJBVTFPOU10S0pRPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
} }

2
go.mod
View File

@@ -3,7 +3,6 @@ module oc-datacenter
go 1.25.0 go 1.25.0
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe
github.com/beego/beego/v2 v2.3.8 github.com/beego/beego/v2 v2.3.8
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674
github.com/minio/madmin-go/v4 v4.1.1 github.com/minio/madmin-go/v4 v4.1.1
@@ -16,6 +15,7 @@ require (
) )
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260325092016-4580200e8057 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/biter777/countries v1.7.5 // indirect github.com/biter777/countries v1.7.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect

14
go.sum
View File

@@ -1,5 +1,19 @@
cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe h1:CHiWQAX7j/bMfbytCWGL2mUgSWYoDY4+bFQbCHEfypk= cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe h1:CHiWQAX7j/bMfbytCWGL2mUgSWYoDY4+bFQbCHEfypk=
cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323080307-5bdd2554a769 h1:TYluuZ28s58KqXrh3Z4nTYje3TVcLJN3VJwVwF9uP0M=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323080307-5bdd2554a769/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323105321-14b449f5473b h1:ouGEzCLGLjUOQ0ciowv9yJv3RhylvUg1GTUlOqXHCSc=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323105321-14b449f5473b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323111629-fa9893e1508c h1:4T+SJgpeK9+lpVQq68chTiAKdaevwvKYo/veP/cOFRY=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323111629-fa9893e1508c/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323112935-b76b22a8fbee h1:XQ85OdhYry8zolODV0ezS6+Ari36SpXcnRSbP4E6v2k=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323112935-b76b22a8fbee/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46 h1:71WVrnLj0SM6PfQxCh25b2JGcL/1MZ2lYt254R/8n28=
cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIyvapWwHZ8t20wMaSaMU6NoZLkMCui8w=
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260325092016-4580200e8057 h1:pR+lZzcCWZ0kke2r2xXa7OpdbLpPW3gZSWZ8gGHh274=
cloud.o-forge.io/core/oc-lib v0.0.0-20260325092016-4580200e8057/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=

View File

@@ -1,20 +1,25 @@
package infrastructure package admiralty
import ( import (
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
"oc-datacenter/conf" "oc-datacenter/conf"
"oc-datacenter/infrastructure/kubernetes/models"
"oc-datacenter/infrastructure/monitor" "oc-datacenter/infrastructure/monitor"
"oc-datacenter/models" "oc-datacenter/infrastructure/storage"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
) )
@@ -22,28 +27,23 @@ import (
// kubeconfigChannels holds channels waiting for kubeconfig delivery (keyed by executionID). // kubeconfigChannels holds channels waiting for kubeconfig delivery (keyed by executionID).
var kubeconfigChannels sync.Map var kubeconfigChannels sync.Map
// kubeconfigEvent is the NATS payload used to transfer the kubeconfig from the source peer to the target peer.
type KubeconfigEvent struct {
DestPeerID string `json:"dest_peer_id"`
ExecutionsID string `json:"executions_id"`
Kubeconfig string `json:"kubeconfig"`
SourcePeerID string `json:"source_peer_id"`
// OriginID is the peer that initiated the provisioning request.
// The PB_CONSIDERS response is routed back to this peer.
OriginID string `json:"origin_id"`
}
// admiraltyConsidersPayload is the PB_CONSIDERS payload emitted after admiralty provisioning. // admiraltyConsidersPayload is the PB_CONSIDERS payload emitted after admiralty provisioning.
type admiraltyConsidersPayload struct { type admiraltyConsidersPayload struct {
OriginID string `json:"origin_id"` OriginID string `json:"origin_id"`
ExecutionsID string `json:"executions_id"` ExecutionsID string `json:"executions_id"`
Secret string `json:"secret,omitempty"` // PeerID is the compute peer (SourcePeerID of the original ArgoKubeEvent).
Error *string `json:"error,omitempty"` // oc-monitord uses it to build a unique considers key per peer, avoiding
// broadcast collisions when multiple compute peers run in parallel.
PeerID string `json:"peer_id,omitempty"`
Secret string `json:"secret,omitempty"`
Error *string `json:"error,omitempty"`
} }
// emitAdmiraltyConsiders publishes a PB_CONSIDERS back to OriginID with the result // emitAdmiraltyConsiders publishes a PB_CONSIDERS back to OriginID with the result
// of the admiralty provisioning. secret is the base64-encoded kubeconfig; err is nil on success. // of the admiralty provisioning. secret is the base64-encoded kubeconfig; err is nil on success.
func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error) { // When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT
// instead of routing through PROPALGATION_EVENT.
func emitAdmiraltyConsiders(executionsID, originID, peerID, secret string, provErr error, self bool) {
var errStr *string var errStr *string
if provErr != nil { if provErr != nil {
s := provErr.Error() s := provErr.Error()
@@ -52,9 +52,19 @@ func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error
payload, _ := json.Marshal(admiraltyConsidersPayload{ payload, _ := json.Marshal(admiraltyConsidersPayload{
OriginID: originID, OriginID: originID,
ExecutionsID: executionsID, ExecutionsID: executionsID,
PeerID: peerID,
Secret: secret, Secret: secret,
Error: errStr, Error: errStr,
}) })
if self {
go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: tools.COMPUTE_RESOURCE,
Method: int(tools.CONSIDERS_EVENT),
Payload: payload,
})
return
}
b, _ := json.Marshal(&tools.PropalgationMessage{ b, _ := json.Marshal(&tools.PropalgationMessage{
DataType: tools.COMPUTE_RESOURCE.EnumIndex(), DataType: tools.COMPUTE_RESOURCE.EnumIndex(),
Action: tools.PB_CONSIDERS, Action: tools.PB_CONSIDERS,
@@ -83,59 +93,57 @@ func NewAdmiraltySetter(execIDS string) *AdmiraltySetter {
// InitializeAsSource is called on the peer that acts as the SOURCE cluster (compute provider). // InitializeAsSource is called on the peer that acts as the SOURCE cluster (compute provider).
// It creates the AdmiraltySource resource, generates a kubeconfig for the target peer, // It creates the AdmiraltySource resource, generates a kubeconfig for the target peer,
// and publishes it on NATS so the target peer can complete its side of the setup. // and publishes it on NATS so the target peer can complete its side of the setup.
func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string) { func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string, self bool, images []string) error {
logger := oclib.GetLogger() logger := oclib.GetLogger()
// Local execution: no Admiralty resources needed — just emit PB_CONSIDERS.
if localPeerID == destPeerID {
emitAdmiraltyConsiders(s.ExecutionsID, originID, localPeerID, "", nil, true)
return nil
}
serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil { if err != nil {
logger.Error().Msg("InitializeAsSource: failed to create service: " + err.Error()) return errors.New("InitializeAsSource: failed to create service: " + err.Error())
return
} }
// Create the AdmiraltySource resource on this cluster (inlined from CreateAdmiraltySource controller) // Create the AdmiraltySource resource on this cluster (inlined from CreateAdmiraltySource controller)
logger.Info().Msg("Creating AdmiraltySource ns-" + s.ExecutionsID) logger.Info().Msg("Creating AdmiraltySource ns-" + s.ExecutionsID)
_, err = serv.CreateAdmiraltySource(ctx, s.ExecutionsID) _, err = serv.CreateAdmiraltySource(ctx, s.ExecutionsID)
if err != nil && !apierrors.IsAlreadyExists(err) { if err != nil && !strings.Contains(err.Error(), "already exists") {
logger.Error().Msg("InitializeAsSource: failed to create source: " + err.Error()) return errors.New("InitializeAsSource: failed to create service: " + err.Error())
return
} }
// Generate a service-account token for the namespace (inlined from GetAdmiraltyKubeconfig controller) // Generate a service-account token for the namespace (inlined from GetAdmiraltyKubeconfig controller)
token, err := serv.GenerateToken(ctx, s.ExecutionsID, 3600) token, err := serv.GenerateToken(ctx, s.ExecutionsID, 3600)
if err != nil { if err != nil {
logger.Error().Msg("InitializeAsSource: failed to generate token for ns-" + s.ExecutionsID + ": " + err.Error()) return errors.New("InitializeAsSource: failed to generate token for ns-" + s.ExecutionsID + ": " + err.Error())
return
} }
kubeconfig, err := buildHostKubeWithToken(token) kubeconfig, err := buildHostKubeWithToken(token)
if err != nil { if err != nil {
logger.Error().Msg("InitializeAsSource: " + err.Error()) return errors.New("InitializeAsSource: " + err.Error())
return
} }
b, err := json.Marshal(kubeconfig) b, err := json.Marshal(kubeconfig)
if err != nil { if err != nil {
logger.Error().Msg("InitializeAsSource: failed to marshal kubeconfig: " + err.Error()) return errors.New("InitializeAsSource: failed to marshal kubeconfig: " + err.Error())
return
} }
encodedKubeconfig := base64.StdEncoding.EncodeToString(b) encodedKubeconfig := base64.StdEncoding.EncodeToString(b)
kube := KubeconfigEvent{ kube := models.KubeconfigEvent{
ExecutionsID: s.ExecutionsID, ExecutionsID: s.ExecutionsID,
Kubeconfig: encodedKubeconfig, Kubeconfig: encodedKubeconfig,
SourcePeerID: localPeerID, SourcePeerID: localPeerID,
DestPeerID: destPeerID, DestPeerID: destPeerID,
OriginID: originID, OriginID: originID,
} SourceExecutionsID: s.ExecutionsID,
if destPeerID == localPeerID { Images: images,
s.InitializeAsTarget(ctx, kube)
return
} }
// Publish the kubeconfig on NATS so the target peer can proceed // Publish the kubeconfig on NATS so the target peer can proceed
payload, err := json.Marshal(kube) payload, err := json.Marshal(kube)
if err != nil { if err != nil {
logger.Error().Msg("InitializeAsSource: failed to marshal kubeconfig event: " + err.Error()) return errors.New("InitializeAsSource: failed to marshal kubeconfig event: " + err.Error())
return
} }
if b, err := json.Marshal(&tools.PropalgationMessage{ if b, err := json.Marshal(&tools.PropalgationMessage{
@@ -145,20 +153,22 @@ func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID st
}); err == nil { }); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter", FromApp: "oc-datacenter",
Datatype: -1, Datatype: tools.COMPUTE_RESOURCE,
User: "", User: "",
Method: int(tools.PROPALGATION_EVENT), Method: int(tools.PROPALGATION_EVENT),
Payload: b, Payload: b,
}) })
} }
logger.Info().Msg("InitializeAsSource: kubeconfig published for ns-" + s.ExecutionsID) logger.Info().Msg("InitializeAsSource: kubeconfig published for ns-" + s.ExecutionsID)
return nil
} }
// InitializeAsTarget is called on the peer that acts as the TARGET cluster (scheduler). // InitializeAsTarget is called on the peer that acts as the TARGET cluster (scheduler).
// It waits for the kubeconfig published by the source peer via NATS, then creates // It waits for the kubeconfig published by the source peer via NATS, then creates
// the Secret, AdmiraltyTarget, and polls until the virtual node appears. // the Secret, AdmiraltyTarget, and polls until the virtual node appears.
// kubeconfigCh must be obtained from RegisterKubeconfigWaiter before this goroutine starts. // self must be true when the origin peer is the local peer (direct CONSIDERS_EVENT emission).
func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj KubeconfigEvent) { func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj models.KubeconfigEvent, self bool) {
logger := oclib.GetLogger() logger := oclib.GetLogger()
defer kubeconfigChannels.Delete(s.ExecutionsID) defer kubeconfigChannels.Delete(s.ExecutionsID)
@@ -174,17 +184,17 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj
// 1. Create the namespace // 1. Create the namespace
logger.Info().Msg("InitializeAsTarget: creating Namespace " + s.ExecutionsID) logger.Info().Msg("InitializeAsTarget: creating Namespace " + s.ExecutionsID)
if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !apierrors.IsAlreadyExists(err) { if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") {
logger.Error().Msg("InitializeAsTarget: failed to create namespace: " + err.Error()) logger.Error().Msg("InitializeAsTarget: failed to create namespace: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self)
return return
} }
// 2. Create the ServiceAccount sa-{executionID} // 2. Create the ServiceAccount sa-{executionID}
logger.Info().Msg("InitializeAsTarget: creating ServiceAccount sa-" + s.ExecutionsID) logger.Info().Msg("InitializeAsTarget: creating ServiceAccount sa-" + s.ExecutionsID)
if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !apierrors.IsAlreadyExists(err) { if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") {
logger.Error().Msg("InitializeAsTarget: failed to create service account: " + err.Error()) logger.Error().Msg("InitializeAsTarget: failed to create service account: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self)
return return
} }
@@ -204,18 +214,18 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj
{"get", "create", "update"}, {"get", "create", "update"},
{"get"}, {"get"},
{"patch"}}, {"patch"}},
); err != nil && !apierrors.IsAlreadyExists(err) { ); err != nil && !strings.Contains(err.Error(), "already exists") {
logger.Error().Msg("InitializeAsTarget: failed to create role: " + err.Error()) logger.Error().Msg("InitializeAsTarget: failed to create role: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self)
return return
} }
// 4. Create the RoleBinding // 4. Create the RoleBinding
rbName := "rb-" + s.ExecutionsID rbName := "rb-" + s.ExecutionsID
logger.Info().Msg("InitializeAsTarget: creating RoleBinding " + rbName) logger.Info().Msg("InitializeAsTarget: creating RoleBinding " + rbName)
if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !apierrors.IsAlreadyExists(err) { if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !strings.Contains(err.Error(), "already exists") {
logger.Error().Msg("InitializeAsTarget: failed to create role binding: " + err.Error()) logger.Error().Msg("InitializeAsTarget: failed to create role binding: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self)
return return
} }
@@ -223,7 +233,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj
logger.Info().Msg("InitializeAsTarget: creating Secret ns-" + s.ExecutionsID) logger.Info().Msg("InitializeAsTarget: creating Secret ns-" + s.ExecutionsID)
if _, err := serv.CreateKubeconfigSecret(ctx, kubeconfigData, s.ExecutionsID, kubeconfigObj.SourcePeerID); err != nil { if _, err := serv.CreateKubeconfigSecret(ctx, kubeconfigData, s.ExecutionsID, kubeconfigObj.SourcePeerID); err != nil {
logger.Error().Msg("InitializeAsTarget: failed to create kubeconfig secret: " + err.Error()) logger.Error().Msg("InitializeAsTarget: failed to create kubeconfig secret: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self)
return return
} }
@@ -235,14 +245,63 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj
if err == nil { if err == nil {
err = fmt.Errorf("CreateAdmiraltyTarget returned nil response") err = fmt.Errorf("CreateAdmiraltyTarget returned nil response")
} }
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self)
return return
} }
// 5. Provision PVCs in the target namespace so Admiralty shadow pods can mount them.
// The claim names must match what oc-monitord generates: {storageName}-{sourceExecutionsID}.
if kubeconfigObj.SourceExecutionsID != "" {
logger.Info().Msg("InitializeAsTarget: provisioning PVCs for source exec " + kubeconfigObj.SourceExecutionsID)
provisionPVCsForTarget(ctx, s.ExecutionsID, kubeconfigObj.SourceExecutionsID, kubeconfigObj.SourcePeerID)
}
// Poll until the virtual node appears (inlined from GetNodeReady controller) // Poll until the virtual node appears (inlined from GetNodeReady controller)
logger.Info().Msg("InitializeAsTarget: waiting for virtual node ns-" + s.ExecutionsID) logger.Info().Msg("InitializeAsTarget: waiting for virtual node ns-" + s.ExecutionsID)
s.waitForNode(ctx, serv, kubeconfigObj.SourcePeerID) s.waitForNode(ctx, serv, kubeconfigObj.SourcePeerID)
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigData, nil) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, kubeconfigData, nil, self)
}
// provisionPVCsForTarget creates PVCs in the Admiralty target namespace for all local
// storages booked under sourceExecutionsID. The claim names use sourceExecutionsID as
// suffix so they match what oc-monitord generates in the workflow spec.
func provisionPVCsForTarget(ctx context.Context, targetNS string, sourceExecutionsID string, peerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: sourceExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
return
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
storageName := storage.ResolveStorageName(b.ResourceID, peerID)
if storageName == "" {
continue
}
event := storage.PVCProvisionEvent{
ExecutionsID: targetNS,
StorageID: b.ResourceID,
StorageName: storageName,
SourcePeerID: peerID,
DestPeerID: peerID,
OriginID: peerID,
}
// Use sourceExecutionsID as claim name suffix so it matches oc-monitord's claimName.
setter := storage.NewPVCSetterWithClaimSuffix(b.ResourceID, sourceExecutionsID)
logger.Info().Msgf("InitializeAsTarget: provisioning PVC %s in ns %s", storage.ClaimName(storageName, sourceExecutionsID), targetNS)
setter.InitializeAsSource(ctx, event, true)
}
} }
// waitForNode polls GetOneNode until the Admiralty virtual node appears on this cluster. // waitForNode polls GetOneNode until the Admiralty virtual node appears on this cluster.
@@ -325,7 +384,11 @@ func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) {
if len(token) == 0 { if len(token) == 0 {
return nil, fmt.Errorf("buildHostKubeWithToken: empty token") return nil, fmt.Errorf("buildHostKubeWithToken: empty token")
} }
encodedCA := base64.StdEncoding.EncodeToString([]byte(conf.GetConfig().KubeCA)) apiHost := conf.GetConfig().KubeExternalHost
if apiHost == "" {
apiHost = conf.GetConfig().KubeHost
}
encodedCA := conf.GetConfig().KubeCA
return &models.KubeConfigValue{ return &models.KubeConfigValue{
APIVersion: "v1", APIVersion: "v1",
CurrentContext: "default", CurrentContext: "default",
@@ -334,7 +397,7 @@ func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) {
Clusters: []models.KubeconfigNamedCluster{{ Clusters: []models.KubeconfigNamedCluster{{
Name: "default", Name: "default",
Cluster: models.KubeconfigCluster{ Cluster: models.KubeconfigCluster{
Server: "https://" + conf.GetConfig().KubeHost + ":6443", Server: "https://" + apiHost + ":6443",
CertificateAuthorityData: encodedCA, CertificateAuthorityData: encodedCA,
}, },
}}, }},
@@ -348,3 +411,35 @@ func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) {
}}, }},
}, nil }, nil
} }
// teardownAdmiraltyIfRemote triggers Admiralty TeardownAsTarget only when at
// least one compute booking for the execution is on a remote peer.
// Local executions do not involve Admiralty.
func (s *AdmiraltySetter) TeardownIfRemote(exec *workflow_execution.WorkflowExecution, selfPeerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", selfPeerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
return
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
if b.DestPeerID != selfPeerID {
logger.Info().Msgf("InfraTeardown: Admiralty teardown exec=%s (remote peer=%s)",
exec.ExecutionsID, b.DestPeerID)
s.TeardownAsTarget(context.Background(), selfPeerID)
return // one teardown per execution is enough
}
}
}

View File

@@ -0,0 +1,39 @@
package infrastructure
import (
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/allowed_image"
)
// defaultAllowedImages est la liste des images utilitaires légères autorisées
// à persister sur tous les peers sans action de l'opérateur.
//
// Ces entrées sont marquées IsDefault:true et ne peuvent pas être supprimées
// via l'API — elles sont sous contrôle exclusif du code de la plateforme.
var defaultAllowedImages = []allowed_image.AllowedImage{
{Image: "natsio/nats-box", TagConstraint: "", IsDefault: true}, // outil NATS utilisé par les native tools
{Image: "library/alpine", TagConstraint: "", IsDefault: true}, // base image légère standard
{Image: "library/busybox", TagConstraint: "", IsDefault: true}, // utilitaire shell minimal
}
// BootstrapAllowedImages insère les images par défaut si elles sont absentes
// en base. Les entrées existantes ne sont pas modifiées.
// À appeler une fois au démarrage, avant beego.Run().
func BootstrapAllowedImages() {
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), nil)
for _, img := range defaultAllowedImages {
// Vérifie si une entrée avec ce nom d'image existe déjà.
existing := req.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"image": {{Operator: dbs.EQUAL.String(), Value: img.Image}},
},
}, "", false)
if existing.Err != "" || len(existing.Data) > 0 {
continue // déjà présente ou erreur de recherche : on passe
}
local := img // copie pour éviter la capture de boucle
req.StoreOne(local.Serialize(&local))
}
}

View File

@@ -1,41 +1,36 @@
package infrastructure package infrastructure
import ( import (
"context" "encoding/json"
"fmt" "fmt"
"sync" "sync"
"time" "time"
"oc-datacenter/infrastructure/minio"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
// processedBookings tracks booking IDs whose start-expiry has already been handled. // processedBookings tracks booking IDs already handled this process lifetime.
// Resets on restart; teardown methods are idempotent so duplicate runs are safe.
var processedBookings sync.Map var processedBookings sync.Map
// processedEndBookings tracks booking IDs whose end-expiry (Admiralty source cleanup) // closingStates is the set of terminal booking states.
// has already been triggered in this process lifetime. var ClosingStates = map[enum.BookingStatus]bool{
var processedEndBookings sync.Map
// closingStates is the set of terminal booking states after which infra must be torn down.
var closingStates = map[enum.BookingStatus]bool{
enum.FAILURE: true, enum.FAILURE: true,
enum.SUCCESS: true, enum.SUCCESS: true,
enum.FORGOTTEN: true, enum.FORGOTTEN: true,
enum.CANCELLED: true, enum.CANCELLED: true,
} }
// WatchBookings starts a passive loop that ticks every minute, scans bookings whose // WatchBookings is a safety-net fallback for when oc-monitord fails to launch.
// ExpectedStartDate + 1 min has passed, transitions them to terminal states when needed, // It detects bookings that are past expected_start_date by at least 1 minute and
// and tears down the associated Kubernetes / Minio infrastructure. // are still in a non-terminal state. Instead of writing to the database directly,
// it emits WORKFLOW_STEP_DONE_EVENT with State=FAILURE on NATS so that oc-scheduler
// handles the state transition — keeping a single source of truth for booking state.
//
// Must be launched in a goroutine from main. // Must be launched in a goroutine from main.
func WatchBookings() { func WatchBookings() {
logger := oclib.GetLogger() logger := oclib.GetLogger()
@@ -43,18 +38,16 @@ func WatchBookings() {
ticker := time.NewTicker(time.Minute) ticker := time.NewTicker(time.Minute)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
if err := scanExpiredBookings(); err != nil { if err := scanStaleBookings(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
if err := scanEndedExec(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error()) logger.Error().Msg("BookingWatchdog: " + err.Error())
} }
} }
} }
// scanExpiredBookings queries all bookings whose start deadline has passed and // scanStaleBookings queries all bookings whose ExpectedStartDate passed more than
// dispatches each one to processExpiredBooking. // 1 minute ago. Non-terminal ones get a WORKFLOW_STEP_DONE_EVENT FAILURE emitted
func scanExpiredBookings() error { // on NATS so oc-scheduler closes them.
func scanStaleBookings() error {
myself, err := oclib.GetMySelf() myself, err := oclib.GetMySelf()
if err != nil { if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err) return fmt.Errorf("could not resolve local peer: %w", err)
@@ -73,7 +66,7 @@ func scanExpiredBookings() error {
}, "", false) }, "", false)
if res.Err != "" { if res.Err != "" {
return fmt.Errorf("booking search failed: %s", res.Err) return fmt.Errorf("stale booking search failed: %s", res.Err)
} }
for _, dbo := range res.Data { for _, dbo := range res.Data {
@@ -81,164 +74,39 @@ func scanExpiredBookings() error {
if !ok { if !ok {
continue continue
} }
go processExpiredBooking(b, peerID) go emitWatchdogFailure(b)
} }
return nil return nil
} }
// processExpiredBooking transitions the booking to a terminal state when applicable, // emitWatchdogFailure publishes a WORKFLOW_STEP_DONE_EVENT FAILURE for a stale
// then tears down infrastructure based on the resource type: // booking. oc-scheduler is the single authority for booking state transitions.
// - LIVE_DATACENTER / COMPUTE_RESOURCE → Admiralty (as target) + Minio (as target) func emitWatchdogFailure(b *bookingmodel.Booking) {
// - LIVE_STORAGE / STORAGE_RESOURCE → Minio (as source)
func processExpiredBooking(b *bookingmodel.Booking, peerID string) {
logger := oclib.GetLogger() logger := oclib.GetLogger()
ctx := context.Background()
// Skip bookings already handled during this process lifetime.
if _, done := processedBookings.Load(b.GetID()); done { if _, done := processedBookings.Load(b.GetID()); done {
return return
} }
if ClosingStates[b.State] {
// Transition non-terminal bookings. processedBookings.Store(b.GetID(), struct{}{})
if !closingStates[b.State] { return
var newState enum.BookingStatus
switch b.State {
case enum.DRAFT, enum.DELAYED:
// DRAFT: never launched; DELAYED: was SCHEDULED but start never arrived.
newState = enum.FORGOTTEN
case enum.SCHEDULED:
// Passed its start date without ever being launched.
newState = enum.FAILURE
case enum.STARTED:
// A running booking is never auto-closed by the watchdog.
return
default:
return
}
upd := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
UpdateOne(map[string]any{"state": newState.EnumIndex()}, b.GetID())
if upd.Err != "" {
logger.Error().Msgf("BookingWatchdog: failed to update booking %s: %s", b.GetID(), upd.Err)
return
}
b.State = newState
logger.Info().Msgf("BookingWatchdog: booking %s (exec=%s, type=%s) → %s",
b.GetID(), b.ExecutionsID, b.ResourceType, b.State)
} }
// Mark as handled before triggering async teardown (avoids double-trigger on next tick). now := time.Now().UTC()
processedBookings.Store(b.GetID(), struct{}{}) payload, err := json.Marshal(tools.WorkflowLifecycleEvent{
BookingID: b.GetID(),
// Tear down infrastructure according to resource type. State: enum.FAILURE.EnumIndex(),
switch b.ResourceType { RealEnd: &now,
case tools.LIVE_DATACENTER, tools.COMPUTE_RESOURCE: })
logger.Info().Msgf("BookingWatchdog: tearing down compute infra exec=%s", b.ExecutionsID)
go NewAdmiraltySetter(b.ExecutionsID).TeardownAsSource(ctx) // i'm the compute units.
go teardownMinioForComputeBooking(ctx, b, peerID)
case tools.LIVE_STORAGE, tools.STORAGE_RESOURCE:
logger.Info().Msgf("BookingWatchdog: tearing down storage infra exec=%s", b.ExecutionsID)
go teardownMinioSourceBooking(ctx, b, peerID)
}
}
// scanEndedBookings queries LIVE_DATACENTER / COMPUTE_RESOURCE bookings whose
// ExpectedEndDate + 1 min has passed and triggers TeardownAsSource for Admiralty,
// cleaning up the compute-side namespace once the execution window is over.
func scanEndedExec() error {
myself, err := oclib.GetMySelf()
if err != nil { if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
// Only compute bookings require Admiralty source cleanup.
"state": {{
Operator: dbs.GT.String(),
Value: 2,
}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("ended-booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*workflow_execution.WorkflowExecution)
if !ok {
continue
}
go teardownAdmiraltyTarget(b)
}
return nil
}
// teardownAdmiraltySource triggers TeardownAsSource for the compute-side namespace
// of an execution whose expected end date has passed.
func teardownAdmiraltyTarget(b *workflow_execution.WorkflowExecution) {
logger := oclib.GetLogger()
// Each executionsID is processed at most once per process lifetime.
if _, done := processedEndBookings.Load(b.ExecutionsID); done {
return return
} }
processedEndBookings.Store(b.ExecutionsID, struct{}{}) tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Method: int(tools.WORKFLOW_STEP_DONE_EVENT),
Payload: payload,
})
logger.Info().Msgf("BookingWatchdog: tearing down Admiralty source exec=%s (booking=%s)", logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID())
b.ExecutionsID, b.GetID()) processedBookings.Store(b.GetID(), struct{}{})
if p, err := oclib.GetMySelf(); err == nil {
NewAdmiraltySetter(b.ExecutionsID).TeardownAsTarget(context.Background(), p.GetID())
}
}
// teardownMinioForComputeBooking finds the LIVE_STORAGE bookings belonging to the same
// execution and triggers Minio-as-target teardown for each (K8s secret + configmap).
// The Minio-as-source side is handled separately by the storage booking's own watchdog pass.
func teardownMinioForComputeBooking(ctx context.Context, computeBooking *bookingmodel.Booking, localPeerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: computeBooking.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
logger.Warn().Msgf("BookingWatchdog: no storage booking found for exec=%s", computeBooking.ExecutionsID)
return
}
for _, dbo := range res.Data {
sb, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
event := minio.MinioDeleteEvent{
ExecutionsID: computeBooking.ExecutionsID,
MinioID: sb.ResourceID,
SourcePeerID: sb.DestPeerID, // peer hosting Minio
DestPeerID: localPeerID, // this peer (compute/target)
OriginID: "",
}
minio.NewMinioSetter(computeBooking.ExecutionsID, sb.ResourceID).TeardownAsTarget(ctx, event)
}
}
// teardownMinioSourceBooking triggers Minio-as-source teardown for a storage booking:
// revokes the scoped service account and removes the execution bucket on this Minio host.
func teardownMinioSourceBooking(ctx context.Context, b *bookingmodel.Booking, localPeerID string) {
event := minio.MinioDeleteEvent{
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: localPeerID, // this peer IS the Minio host
DestPeerID: b.DestPeerID,
OriginID: "",
}
minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event)
} }

View File

@@ -0,0 +1,323 @@
package kubernetes
import (
"context"
"encoding/base64"
"fmt"
"strings"
"sync"
"time"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/allowed_image"
"cloud.o-forge.io/core/oc-lib/tools"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type KubernetesService struct {
ExecutionsID string
}
func NewKubernetesService(executionsID string) *KubernetesService {
return &KubernetesService{
ExecutionsID: executionsID,
}
}
// prepullRegistry associe executionsID → images pre-pullées pour ce run.
// Utilisé par CleanupImages après WORKFLOW_DONE_EVENT.
var prepullRegistry sync.Map
// RunPrepull crée un Job k8s dans le namespace executionsID qui pre-pull chaque
// image de la liste (imagePullPolicy: IfNotPresent). Bloque jusqu'à la complétion
// du Job ou timeout (5 min). Enregistre les images pour le cleanup post-exec.
func (s *KubernetesService) RunPrepull(ctx context.Context, images []string) error {
logger := oclib.GetLogger()
// Toujours stocker pour le cleanup, même si le pull échoue.
prepullRegistry.Store(s.ExecutionsID, images)
if len(images) == 0 {
return nil
}
cs, err := s.newClientset()
if err != nil {
return fmt.Errorf("RunPrepull: failed to build clientset: %w", err)
}
// Un container par image — ils tournent tous en parallèle dans le même pod.
containers := make([]corev1.Container, 0, len(images))
for i, img := range images {
containers = append(containers, corev1.Container{
Name: fmt.Sprintf("prepull-%d", i),
Image: img,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"true"},
})
}
var backoff int32 = 0
jobName := "prepull-" + s.ExecutionsID
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: s.ExecutionsID,
},
Spec: batchv1.JobSpec{
BackoffLimit: &backoff,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: containers,
},
},
},
}
if _, err := cs.BatchV1().Jobs(s.ExecutionsID).Create(ctx, job, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("RunPrepull: failed to create job: %w", err)
}
timeout := int64(300) // 5 min, cohérent avec waitForConsiders
watcher, err := cs.BatchV1().Jobs(s.ExecutionsID).Watch(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + jobName,
TimeoutSeconds: &timeout,
})
if err != nil {
return fmt.Errorf("RunPrepull: failed to watch job: %w", err)
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
j, ok := event.Object.(*batchv1.Job)
if !ok {
continue
}
for _, cond := range j.Status.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
logger.Info().Msgf("RunPrepull: job %s completed for ns %s", jobName, s.ExecutionsID)
return nil
}
if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
return fmt.Errorf("RunPrepull: job %s failed for ns %s", jobName, s.ExecutionsID)
}
}
}
return fmt.Errorf("RunPrepull: timeout waiting for job %s", jobName)
}
// CleanupImages récupère les images pre-pullées pour ce run, filtre celles
// absentes de AllowedImages, et planifie leur suppression via un DaemonSet
// privilégié (crictl rmi) sur tous les nœuds du cluster.
// Appelé depuis teardownInfraForExecution au WORKFLOW_DONE_EVENT.
func (s *KubernetesService) CleanupImages(ctx context.Context) {
logger := oclib.GetLogger()
raw, ok := prepullRegistry.LoadAndDelete(s.ExecutionsID)
if !ok {
return
}
images := raw.([]string)
if len(images) == 0 {
return
}
toRemove := s.filterNonAllowed(images)
if len(toRemove) == 0 {
logger.Info().Msgf("CleanupImages: all images for %s are in AllowedImages, keeping", s.ExecutionsID)
return
}
logger.Info().Msgf("CleanupImages: scheduling removal of %d image(s) for %s: %v",
len(toRemove), s.ExecutionsID, toRemove)
go s.scheduleImageRemoval(ctx, toRemove)
}
// filterNonAllowed retourne les images non présentes dans AllowedImages.
func (s *KubernetesService) filterNonAllowed(images []string) []string {
var toRemove []string
for _, img := range images {
registry, name, tag := s.parseImage(img)
res := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), nil).Search(
&dbs.Filters{
And: map[string][]dbs.Filter{
"image": {{Operator: dbs.EQUAL.String(), Value: name}},
},
}, "", false)
if len(res.Data) == 0 {
toRemove = append(toRemove, img)
continue
}
allowed := false
for _, d := range res.Data {
a, ok := d.(*allowed_image.AllowedImage)
if !ok {
continue
}
if a.Registry != "" && a.Registry != registry {
continue
}
if s.matchesTagConstraint(a.TagConstraint, tag) {
allowed = true
break
}
}
if !allowed {
toRemove = append(toRemove, img)
}
}
return toRemove
}
// scheduleImageRemoval crée un DaemonSet privilégié sur tous les nœuds du cluster
// qui exécute "crictl rmi" pour chaque image à supprimer, puis supprime le DaemonSet.
func (s *KubernetesService) scheduleImageRemoval(ctx context.Context, images []string) {
logger := oclib.GetLogger()
cs, err := s.newClientset()
if err != nil {
logger.Error().Msgf("scheduleImageRemoval: failed to build clientset: %v", err)
return
}
// Commande shell : crictl rmi image1 image2 ... || true (best-effort)
args := strings.Join(images, " ")
cmd := fmt.Sprintf("crictl rmi %s || true", args)
privileged := true
dsName := "oc-cleanup-" + s.ExecutionsID
ds := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: dsName,
Namespace: "default",
Labels: map[string]string{"app": dsName},
},
Spec: appsv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": dsName},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": dsName},
},
Spec: corev1.PodSpec{
// Tolère tous les taints pour atteindre tous les nœuds.
Tolerations: []corev1.Toleration{
{Operator: corev1.TolerationOpExists},
},
HostPID: true,
Containers: []corev1.Container{{
Name: "cleanup",
Image: "alpine:3",
// nsenter entre dans le namespace mount du host (PID 1)
// pour accéder au crictl installé sur le nœud.
Command: []string{"sh", "-c",
"nsenter -t 1 -m -u -i -n -- sh -c '" + cmd + "'"},
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
},
}},
},
},
},
}
if _, err := cs.AppsV1().DaemonSets("default").Create(ctx, ds, metav1.CreateOptions{}); err != nil {
logger.Error().Msgf("scheduleImageRemoval: failed to create DaemonSet: %v", err)
return
}
// Laisse le temps au DaemonSet de tourner sur tous les nœuds.
time.Sleep(30 * time.Second)
if err := cs.AppsV1().DaemonSets("default").Delete(ctx, dsName, metav1.DeleteOptions{}); err != nil {
logger.Error().Msgf("scheduleImageRemoval: failed to delete DaemonSet: %v", err)
}
logger.Info().Msgf("scheduleImageRemoval: completed for %s", s.ExecutionsID)
}
// parseImage décompose "registry/name:tag" en ses trois composants.
// registry vide si aucun composant ressemblant à un hostname n'est détecté.
func (s *KubernetesService) parseImage(image string) (registry, name, tag string) {
parts := strings.SplitN(image, ":", 2)
nameWithRegistry := parts[0]
if len(parts) == 2 {
tag = parts[1]
} else {
tag = "latest"
}
slashIdx := strings.Index(nameWithRegistry, "/")
if slashIdx == -1 {
return "", nameWithRegistry, tag
}
prefix := nameWithRegistry[:slashIdx]
// Présence d'un "." ou ":" ou "localhost" → c'est un hostname de registry.
if strings.ContainsAny(prefix, ".:") || prefix == "localhost" {
return prefix, nameWithRegistry[slashIdx+1:], tag
}
return "", nameWithRegistry, tag
}
// matchesTagConstraint vérifie si tag satisfait la contrainte.
// Vide = toutes versions. Supporte exact et glob suffixe ("3.*").
func (s *KubernetesService) matchesTagConstraint(constraint, tag string) bool {
if constraint == "" {
return true
}
if strings.HasSuffix(constraint, "*") {
return strings.HasPrefix(tag, strings.TrimSuffix(constraint, "*"))
}
return constraint == tag
}
// newClientset construit un client k8s depuis les credentials base64 en conf.
func (s *KubernetesService) newClientset() (*kubernetes.Clientset, error) {
caData, err := base64.StdEncoding.DecodeString(conf.GetConfig().KubeCA)
if err != nil {
return nil, fmt.Errorf("newClientset: invalid KubeCA: %w", err)
}
certData, err := base64.StdEncoding.DecodeString(conf.GetConfig().KubeCert)
if err != nil {
return nil, fmt.Errorf("newClientset: invalid KubeCert: %w", err)
}
keyData, err := base64.StdEncoding.DecodeString(conf.GetConfig().KubeData)
if err != nil {
return nil, fmt.Errorf("newClientset: invalid KubeData: %w", err)
}
cfg := &rest.Config{
Host: "https://" + conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: caData,
CertData: certData,
KeyData: keyData,
},
}
return kubernetes.NewForConfig(cfg)
}
func (s *KubernetesService) CreateNamespace() error {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil {
logger.Error().Msg("CreateNamespace: failed to init k8s service: " + err.Error())
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return serv.ProvisionExecutionNamespace(ctx, s.ExecutionsID)
}

View File

@@ -15,12 +15,11 @@ type KubeConfigValue struct {
type KubeconfigUser struct { type KubeconfigUser struct {
Name string `yaml:"name" json:"name"` Name string `yaml:"name" json:"name"`
User KubeconfigUserKeyPair `yaml:"user" json:"user"` User KubeconfigUserKeyPair `yaml:"user" json:"user"`
} }
// KubeconfigUserKeyPair is a struct used to create a kubectl configuration YAML file // KubeconfigUserKeyPair is a struct used to create a kubectl configuration YAML file
type KubeconfigUserKeyPair struct { type KubeconfigUserKeyPair struct {
Token string `yaml:"token" json:"token"` Token string `yaml:"token" json:"token"`
} }
// KubeconfigAuthProvider is a struct used to create a kubectl authentication provider // KubeconfigAuthProvider is a struct used to create a kubectl authentication provider
@@ -54,3 +53,20 @@ type KubeconfigContext struct {
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"` Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
User string `yaml:"user" json:"user"` User string `yaml:"user" json:"user"`
} }
// kubeconfigEvent is the NATS payload used to transfer the kubeconfig from the source peer to the target peer.
type KubeconfigEvent struct {
DestPeerID string `json:"dest_peer_id"`
ExecutionsID string `json:"executions_id"`
Kubeconfig string `json:"kubeconfig"`
SourcePeerID string `json:"source_peer_id"`
// OriginID is the peer that initiated the provisioning request.
// The PB_CONSIDERS response is routed back to this peer.
OriginID string `json:"origin_id"`
// SourceExecutionsID is the execution namespace on the source cluster.
// Used by the target to provision PVCs with the correct claim name.
SourceExecutionsID string `json:"source_executions_id,omitempty"`
// Images is the list of container images to pre-pull on the compute peer
// before the workflow starts.
Images []string `json:"images,omitempty"`
}

View File

@@ -0,0 +1,359 @@
package kubernetes
import (
"context"
"fmt"
"regexp"
"strings"
"time"
"oc-datacenter/conf"
"oc-datacenter/infrastructure"
"oc-datacenter/infrastructure/admiralty"
"oc-datacenter/infrastructure/storage"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// uuidNsPattern matches Kubernetes namespace names that are execution UUIDs.
var uuidNsPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)
// WatchInfra is a safety-net watchdog that periodically scans Kubernetes for
// execution namespaces whose WorkflowExecution has reached a terminal state
// but whose infra was never torn down (e.g. because WORKFLOW_DONE_EVENT was
// missed due to oc-monitord or oc-datacenter crash/restart).
//
// Must be launched in a goroutine from main.
func (s *KubernetesService) Watch() {
logger := oclib.GetLogger()
logger.Info().Msg("InfraWatchdog: started")
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := s.scanOrphaned(); err != nil {
logger.Error().Msg("InfraWatchdog: " + err.Error())
}
if err := s.scanOrphanedMinio(); err != nil {
logger.Error().Msg("InfraWatchdog(minio): " + err.Error())
}
if err := s.scanOrphanedAdmiraltyNodes(); err != nil {
logger.Error().Msg("InfraWatchdog(admiralty-nodes): " + err.Error())
}
if err := s.scanOrphanedPVC(); err != nil {
logger.Error().Msg("InfraWatchdog(pvc): " + err.Error())
}
}
}
// scanOrphanedInfra lists all UUID-named Kubernetes namespaces, looks up their
// WorkflowExecution in the DB, and triggers teardown for any that are in a
// terminal state. Namespaces already in Terminating phase are skipped.
func (s *KubernetesService) scanOrphaned() error {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert,
conf.GetConfig().KubeData,
)
if err != nil {
return fmt.Errorf("failed to init k8s service: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
nsList, err := serv.Set.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list namespaces: %w", err)
}
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
for _, ns := range nsList.Items {
executionsID := ns.Name
if !uuidNsPattern.MatchString(executionsID) {
continue
}
// Skip namespaces already being deleted by a previous teardown.
if ns.Status.Phase == v1.NamespaceTerminating {
continue
}
exec := findTerminalExecution(executionsID, peerID)
if exec == nil {
continue
}
logger.Info().Msgf("InfraWatchdog: orphaned infra detected for execution %s (state=%v) → teardown",
executionsID, exec.State)
go s.TeardownForExecution(exec.GetID())
}
return nil
}
// scanOrphanedMinio scans LIVE_STORAGE bookings for executions that are in a
// terminal state and triggers Minio teardown for each unique executionsID found.
// This covers the case where the Kubernetes namespace is already gone (manual
// deletion, prior partial teardown) but Minio SA and bucket were never revoked.
func (s *KubernetesService) scanOrphanedMinio() error {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err)
}
// Collect unique executionsIDs to avoid redundant teardowns.
seen := map[string]bool{}
ctx := context.Background()
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok || seen[b.ExecutionsID] {
continue
}
exec := findTerminalExecution(b.ExecutionsID, peerID)
if exec == nil {
continue
}
seen[b.ExecutionsID] = true
minio := storage.NewMinioSetter(b.ExecutionsID, b.ResourceID)
// Determine this peer's role and call the appropriate teardown.
if b.DestPeerID == peerID {
logger.Info().Msgf("InfraWatchdog(minio): orphaned target resources for exec %s → TeardownAsTarget", b.ExecutionsID)
event := storage.MinioDeleteEvent{
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: b.DestPeerID,
DestPeerID: peerID,
}
go minio.TeardownAsTarget(ctx, event)
} else {
logger.Info().Msgf("InfraWatchdog(minio): orphaned source resources for exec %s → TeardownAsSource", b.ExecutionsID)
event := storage.MinioDeleteEvent{
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: peerID,
DestPeerID: b.DestPeerID,
}
go minio.TeardownAsSource(ctx, event)
}
}
return nil
}
// scanOrphanedAdmiraltyNodes lists all Kubernetes nodes, identifies Admiralty
// virtual nodes (name prefix "admiralty-{UUID}-") that are NotReady, and
// explicitly deletes them when their WorkflowExecution is in a terminal state.
//
// This covers the gap where the namespace is already gone (or Terminating) but
// the virtual node was never cleaned up by the Admiralty controller — which can
// happen when the node goes NotReady before the AdmiraltyTarget CRD is deleted.
func (s *KubernetesService) scanOrphanedAdmiraltyNodes() error {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert,
conf.GetConfig().KubeData,
)
if err != nil {
return fmt.Errorf("failed to init k8s service: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
nodeList, err := serv.Set.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
for _, node := range nodeList.Items {
// Admiralty virtual nodes are named: admiralty-{executionID}-target-{...}
rest := strings.TrimPrefix(node.Name, "admiralty-")
if rest == node.Name {
continue // not an admiralty node
}
// UUID is exactly 36 chars: 8-4-4-4-12
if len(rest) < 36 {
continue
}
executionsID := rest[:36]
if !uuidNsPattern.MatchString(executionsID) {
continue
}
// Only act on NotReady nodes.
ready := false
for _, cond := range node.Status.Conditions {
if cond.Type == v1.NodeReady {
ready = cond.Status == v1.ConditionTrue
break
}
}
if ready {
continue
}
exec := findTerminalExecution(executionsID, peerID)
if exec == nil {
continue
}
logger.Info().Msgf("InfraWatchdog(admiralty-nodes): NotReady orphaned node %s for terminal execution %s → deleting",
node.Name, executionsID)
if delErr := serv.Set.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); delErr != nil {
logger.Error().Msgf("InfraWatchdog(admiralty-nodes): failed to delete node %s: %v", node.Name, delErr)
}
}
return nil
}
// scanOrphanedPVC scans LIVE_STORAGE bookings for executions that are in a
// terminal state and triggers PVC teardown for each one where this peer holds
// the local storage. This covers the case where the Kubernetes namespace was
// already deleted (or its teardown was partial) but the PersistentVolume
// (cluster-scoped) was never reclaimed.
//
// A LIVE_STORAGE booking is treated as a local PVC only when ResolveStorageName
// returns a non-empty name — the same guard used by teardownPVCForExecution.
func (s *KubernetesService) scanOrphanedPVC() error {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err)
}
seen := map[string]bool{}
ctx := context.Background()
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok || seen[b.ExecutionsID+b.ResourceID] {
continue
}
storageName := storage.ResolveStorageName(b.ResourceID, peerID)
if storageName == "" {
continue // not a local PVC booking
}
exec := findTerminalExecution(b.ExecutionsID, peerID)
if exec == nil {
continue
}
seen[b.ExecutionsID+b.ResourceID] = true
logger.Info().Msgf("InfraWatchdog(pvc): orphaned PVC for exec %s storage %s → TeardownAsSource",
b.ExecutionsID, b.ResourceID)
event := storage.PVCDeleteEvent{
ExecutionsID: b.ExecutionsID,
StorageID: b.ResourceID,
StorageName: storageName,
SourcePeerID: peerID,
DestPeerID: b.DestPeerID,
}
go storage.NewPVCSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event)
}
return nil
}
// findTerminalExecution returns the WorkflowExecution for the given executionsID
// if it exists in the DB and is in a terminal state, otherwise nil.
func findTerminalExecution(executionsID string, peerID string) *workflow_execution.WorkflowExecution {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
return nil
}
exec, ok := res.Data[0].(*workflow_execution.WorkflowExecution)
if !ok {
return nil
}
if !infrastructure.ClosingStates[exec.State] {
return nil
}
return exec
}
// teardownInfraForExecution handles infrastructure cleanup when a workflow terminates.
// oc-datacenter is responsible only for infra here — booking/execution state
// is managed by oc-scheduler.
func (s *KubernetesService) TeardownForExecution(executionID string) {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
if err != nil || myself == nil {
return
}
selfPeerID := myself.GetID()
adminReq := &tools.APIRequest{Admin: true}
res, _, loadErr := workflow_execution.NewAccessor(adminReq).LoadOne(executionID)
if loadErr != nil || res == nil {
logger.Warn().Msgf("teardownInfraForExecution: execution %s not found", executionID)
return
}
exec := res.(*workflow_execution.WorkflowExecution)
ctx := context.Background()
admiralty.NewAdmiraltySetter(s.ExecutionsID).TeardownIfRemote(exec, selfPeerID)
storage.NewMinioSetter(s.ExecutionsID, "").TeardownForExecution(ctx, selfPeerID)
storage.NewPVCSetter(s.ExecutionsID, "").TeardownForExecution(ctx, selfPeerID)
s.CleanupImages(ctx)
}

View File

@@ -1,25 +0,0 @@
package infrastructure
import (
"context"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
)
// ---------------------------------------------------------------------------
// Kubernetes namespace helper
// ---------------------------------------------------------------------------
func CreateNamespace(ns string) error {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil {
logger.Error().Msg("CreateNamespace: failed to init k8s service: " + err.Error())
return err
}
return serv.ProvisionExecutionNamespace(context.Background(), ns)
}

View File

@@ -1,167 +0,0 @@
package infrastructure
import (
"context"
"encoding/json"
"fmt"
"oc-datacenter/infrastructure/minio"
"sync"
"cloud.o-forge.io/core/oc-lib/tools"
)
// roleWaiters maps executionID → channel expecting the role-assignment message from OC discovery.
var roleWaiters sync.Map
// ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event.
//
// When MinioID is non-empty the event concerns Minio credential provisioning;
// otherwise it concerns Admiralty kubeconfig provisioning.
type ArgoKubeEvent struct {
ExecutionsID string `json:"executions_id"`
DestPeerID string `json:"dest_peer_id"`
Type tools.DataType `json:"data_type"`
SourcePeerID string `json:"source_peer_id"`
MinioID string `json:"minio_id,omitempty"`
// OriginID is the peer that initiated the request; the PB_CONSIDERS
// response is routed back to this peer once provisioning completes.
OriginID string `json:"origin_id,omitempty"`
}
// ListenNATS starts all NATS subscriptions for the infrastructure layer.
// Must be launched in a goroutine from main.
func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
// ─── ARGO_KUBE_EVENT ────────────────────────────────────────────────────────
// Triggered by oc-discovery to notify this peer of a provisioning task.
// Dispatches to Admiralty or Minio based on whether MinioID is set.
tools.ARGO_KUBE_EVENT: func(resp tools.NATSResponse) {
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(resp.Payload, argo); err != nil {
return
}
if argo.Type == tools.STORAGE_RESOURCE {
fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT")
// ── Minio credential provisioning ──────────────────────────────
setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID)
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG MYSELF")
err := CreateNamespace(argo.ExecutionsID)
fmt.Println("NS", err)
// Same peer: source creates credentials and immediately stores them.
go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID)
} else {
// Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "")
// so oc-discovery routes the role-assignment to the Minio host.
phase1 := minio.MinioCredentialEvent{
ExecutionsID: argo.ExecutionsID,
MinioID: argo.MinioID,
SourcePeerID: argo.SourcePeerID,
DestPeerID: argo.DestPeerID,
OriginID: argo.OriginID,
}
if b, err := json.Marshal(phase1); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_MINIO_CONFIG,
}); err == nil {
fmt.Println("CONFIG THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
} else {
fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT")
// ── Admiralty kubeconfig provisioning (existing behaviour) ──────
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG MYSELF")
err := CreateNamespace(argo.ExecutionsID)
fmt.Println("NS", err)
go NewAdmiraltySetter(argo.ExecutionsID).InitializeAsSource(
context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID)
} else if b, err := json.Marshal(argo); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_ADMIRALTY_CONFIG,
}); err == nil {
fmt.Println("CONFIG THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
},
// ─── ADMIRALTY_CONFIG_EVENT ─────────────────────────────────────────────────
// Forwarded by oc-discovery after receiving via libp2p ProtocolAdmiraltyConfigResource.
// Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence).
tools.ADMIRALTY_CONFIG_EVENT: func(resp tools.NATSResponse) {
kubeconfigEvent := KubeconfigEvent{}
if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil {
if kubeconfigEvent.Kubeconfig != "" {
// Phase 2: kubeconfig present → this peer is the TARGET (scheduler).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget(
context.Background(), kubeconfigEvent)
} else {
err := CreateNamespace(kubeconfigEvent.ExecutionsID)
fmt.Println("NS", err)
// Phase 1: no kubeconfig → this peer is the SOURCE (compute).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource(
context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID)
}
}
},
// ─── MINIO_CONFIG_EVENT ──────────────────────────────────────────────────────
// Forwarded by oc-discovery after receiving via libp2p ProtocolMinioConfigResource.
// Payload is a MinioCredentialEvent (phase discriminated by Access presence).
tools.MINIO_CONFIG_EVENT: func(resp tools.NATSResponse) {
minioEvent := minio.MinioCredentialEvent{}
if err := json.Unmarshal(resp.Payload, &minioEvent); err == nil {
if minioEvent.Access != "" {
// Phase 2: credentials present → this peer is the TARGET (compute).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget(
context.Background(), minioEvent)
} else {
err := CreateNamespace(minioEvent.ExecutionsID)
fmt.Println("NS", err)
// Phase 1: no credentials → this peer is the SOURCE (Minio host).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource(
context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID)
}
}
},
// ─── REMOVE_RESOURCE ────────────────────────────────────────────────────────
// Routed by oc-discovery via ProtocolDeleteResource for datacenter teardown.
// Only STORAGE_RESOURCE and COMPUTE_RESOURCE deletions are handled here.
tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.STORAGE_RESOURCE:
deleteEvent := minio.MinioDeleteEvent{}
if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" {
go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID).
TeardownAsSource(context.Background(), deleteEvent)
}
case tools.COMPUTE_RESOURCE:
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(resp.Payload, argo); err == nil && argo.ExecutionsID != "" {
go NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background())
}
}
},
})
}

269
infrastructure/nats/nats.go Normal file
View File

@@ -0,0 +1,269 @@
package nats
import (
"context"
"encoding/json"
"fmt"
"oc-datacenter/infrastructure/admiralty"
"oc-datacenter/infrastructure/kubernetes"
"oc-datacenter/infrastructure/kubernetes/models"
"oc-datacenter/infrastructure/storage"
"sync"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
)
// roleWaiters maps executionID → channel expecting the role-assignment message from OC discovery.
var roleWaiters sync.Map
// ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event.
//
// When MinioID is non-empty and Local is false, the event concerns Minio credential provisioning.
// When Local is true, the event concerns local PVC provisioning.
// Otherwise it concerns Admiralty kubeconfig provisioning.
type ArgoKubeEvent struct {
ExecutionsID string `json:"executions_id"`
DestPeerID string `json:"dest_peer_id"`
Type tools.DataType `json:"data_type"`
SourcePeerID string `json:"source_peer_id"`
MinioID string `json:"minio_id,omitempty"`
// Local signals that this STORAGE_RESOURCE event is for a local PVC (not Minio).
Local bool `json:"local,omitempty"`
StorageName string `json:"storage_name,omitempty"`
// OriginID is the peer that initiated the request; the PB_CONSIDERS
// response is routed back to this peer once provisioning completes.
OriginID string `json:"origin_id,omitempty"`
// Images is the list of container images to pre-pull on the target peer
// before the workflow starts. Empty for STORAGE_RESOURCE events.
Images []string `json:"images,omitempty"`
}
// ListenNATS starts all NATS subscriptions for the infrastructure layer.
// Must be launched in a goroutine from main.
func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
// ─── ARGO_KUBE_EVENT ────────────────────────────────────────────────────────
// Triggered by oc-discovery to notify this peer of a provisioning task.
// Dispatches to Admiralty, Minio, or local PVC based on event fields.
tools.ARGO_KUBE_EVENT: func(resp tools.NATSResponse) {
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(resp.Payload, argo); err != nil {
return
}
kube := kubernetes.NewKubernetesService(argo.ExecutionsID)
if argo.Type == tools.STORAGE_RESOURCE {
if argo.Local {
fmt.Println("DETECT LOCAL PVC ARGO_KUBE_EVENT")
// ── Local PVC provisioning ──────────────────────────────────
setter := storage.NewPVCSetter(argo.ExecutionsID, argo.MinioID)
event := storage.PVCProvisionEvent{
ExecutionsID: argo.ExecutionsID,
StorageID: argo.MinioID,
StorageName: argo.StorageName,
SourcePeerID: argo.SourcePeerID,
DestPeerID: argo.DestPeerID,
OriginID: argo.OriginID,
}
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG PVC MYSELF")
err := kube.CreateNamespace()
fmt.Println("NS", err)
go setter.InitializeAsSource(context.Background(), event, true)
} else {
// Cross-peer: route to dest peer via PB_PVC_CONFIG.
if b, err := json.Marshal(event); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_PVC_CONFIG,
}); err == nil {
fmt.Println("CONFIG PVC THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
} else {
fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT")
// ── Minio credential provisioning ──────────────────────────────
setter := storage.NewMinioSetter(argo.ExecutionsID, argo.MinioID)
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG MYSELF")
err := kube.CreateNamespace()
fmt.Println("NS", err)
go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID, true)
} else {
// Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "")
// so oc-discovery routes the role-assignment to the Minio host.
phase1 := storage.MinioCredentialEvent{
ExecutionsID: argo.ExecutionsID,
MinioID: argo.MinioID,
SourcePeerID: argo.SourcePeerID,
DestPeerID: argo.DestPeerID,
OriginID: argo.OriginID,
}
if b, err := json.Marshal(phase1); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_MINIO_CONFIG,
}); err == nil {
fmt.Println("CONFIG THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
}
} else {
fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT")
// ── Pre-pull + Admiralty kubeconfig provisioning ─────────────
fmt.Println(argo.SourcePeerID, argo.DestPeerID)
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG MYSELF")
kube := kubernetes.NewKubernetesService(argo.ExecutionsID)
err := kube.CreateNamespace()
fmt.Println("NS", err)
go func(a ArgoKubeEvent) {
ctx := context.Background()
// Pre-pull en premier : PB_CONSIDERS n'est envoyé qu'après.
if len(a.Images) > 0 {
if err := kube.RunPrepull(ctx, a.Images); err != nil {
logger := oclib.GetLogger()
logger.Error().Msgf("RunPrepull local: %v", err)
}
}
admiralty.NewAdmiraltySetter(a.ExecutionsID).InitializeAsSource(
ctx, a.SourcePeerID, a.DestPeerID, a.OriginID, true, a.Images)
}(*argo)
} else if b, err := json.Marshal(argo); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_ADMIRALTY_CONFIG,
}); err == nil {
fmt.Println("CONFIG THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
},
// ─── ADMIRALTY_CONFIG_EVENT ─────────────────────────────────────────────────
// Forwarded by oc-discovery after receiving via libp2p ProtocolAdmiraltyConfigResource.
// Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence).
tools.ADMIRALTY_CONFIG_EVENT: func(resp tools.NATSResponse) {
kubeconfigEvent := models.KubeconfigEvent{}
if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil {
if kubeconfigEvent.Kubeconfig != "" {
// Phase 2: kubeconfig present → this peer is the TARGET (scheduler).
fmt.Println("CreateAdmiraltyTarget")
admiralty.NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget(
context.Background(), kubeconfigEvent, false)
} else {
kube := kubernetes.NewKubernetesService(kubeconfigEvent.ExecutionsID)
err := kube.CreateNamespace()
fmt.Println("NS", err)
// Phase 1: no kubeconfig → this peer is the SOURCE (compute).
if len(kubeconfigEvent.Images) > 0 {
if err := kube.RunPrepull(context.Background(), kubeconfigEvent.Images); err != nil {
logger := oclib.GetLogger()
logger.Error().Msgf("RunPrepull local: %v", err)
}
}
fmt.Println("CreateAdmiraltySource")
admiralty.NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource(
context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID,
kubeconfigEvent.OriginID, false, kubeconfigEvent.Images)
}
}
},
// ─── MINIO_CONFIG_EVENT ──────────────────────────────────────────────────────
// Forwarded by oc-discovery after receiving via libp2p ProtocolMinioConfigResource.
// Payload is a MinioCredentialEvent (phase discriminated by Access presence).
tools.MINIO_CONFIG_EVENT: func(resp tools.NATSResponse) {
minioEvent := storage.MinioCredentialEvent{}
if err := json.Unmarshal(resp.Payload, &minioEvent); err == nil {
if minioEvent.Access != "" {
// Phase 2: credentials present → this peer is the TARGET (compute).
storage.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget(
context.Background(), minioEvent, false)
} else {
err := kubernetes.NewKubernetesService(minioEvent.ExecutionsID).CreateNamespace()
fmt.Println("NS", err)
// Phase 1: no credentials → this peer is the SOURCE (Minio host).
storage.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource(
context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID, false)
}
}
},
// ─── PVC_CONFIG_EVENT ────────────────────────────────────────────────────────
// Forwarded by oc-discovery for cross-peer local PVC provisioning.
// The dest peer creates the PVC in its own cluster.
tools.PVC_CONFIG_EVENT: func(resp tools.NATSResponse) {
event := storage.PVCProvisionEvent{}
if err := json.Unmarshal(resp.Payload, &event); err == nil {
err := kubernetes.NewKubernetesService(event.ExecutionsID).CreateNamespace()
fmt.Println("NS", err)
storage.NewPVCSetter(event.ExecutionsID, event.StorageID).InitializeAsSource(
context.Background(), event, false)
}
},
// ─── WORKFLOW_DONE_EVENT ─────────────────────────────────────────────────────
// Emitted by oc-monitord when the top-level Argo workflow reaches a terminal
// phase. oc-datacenter is responsible only for infrastructure teardown here:
// booking/execution state management is handled entirely by oc-scheduler.
tools.WORKFLOW_DONE_EVENT: func(resp tools.NATSResponse) {
var evt tools.WorkflowLifecycleEvent
if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.ExecutionsID == "" {
return
}
go kubernetes.NewKubernetesService(evt.ExecutionsID).TeardownForExecution(evt.ExecutionID)
},
// ─── REMOVE_RESOURCE ────────────────────────────────────────────────────────
// Routed by oc-discovery via ProtocolDeleteResource for datacenter teardown.
// Only STORAGE_RESOURCE and COMPUTE_RESOURCE deletions are handled here.
tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.STORAGE_RESOURCE:
// Try PVC delete first (Local=true), fall back to Minio.
pvcEvent := storage.PVCDeleteEvent{}
if err := json.Unmarshal(resp.Payload, &pvcEvent); err == nil && pvcEvent.ExecutionsID != "" && pvcEvent.StorageName != "" {
go storage.NewPVCSetter(pvcEvent.ExecutionsID, pvcEvent.StorageID).
TeardownAsSource(context.Background(), pvcEvent)
} else {
deleteEvent := storage.MinioDeleteEvent{}
if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" {
go storage.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID).
TeardownAsSource(context.Background(), deleteEvent)
}
}
case tools.COMPUTE_RESOURCE:
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(resp.Payload, argo); err == nil && argo.ExecutionsID != "" {
go admiralty.NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background())
}
}
},
})
}

View File

@@ -1,4 +1,4 @@
package minio package storage
import ( import (
"context" "context"

View File

@@ -1,4 +1,4 @@
package minio package storage
import ( import (
"context" "context"
@@ -9,6 +9,8 @@ import (
"oc-datacenter/conf" "oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -42,9 +44,22 @@ type minioConsidersPayload struct {
Error *string `json:"error,omitempty"` Error *string `json:"error,omitempty"`
} }
// MinioSetter carries the execution context for a Minio credential provisioning.
type MinioSetter struct {
ExecutionsID string // used as both the bucket name and the K8s namespace suffix
MinioID string // ID of the Minio storage resource
}
func NewMinioSetter(execID, minioID string) *MinioSetter {
return &MinioSetter{ExecutionsID: execID, MinioID: minioID}
}
// emitConsiders publishes a PB_CONSIDERS back to OriginID with the result of // emitConsiders publishes a PB_CONSIDERS back to OriginID with the result of
// the minio provisioning. secret is the provisioned credential; err is nil on success. // the minio provisioning. secret is the provisioned credential; err is nil on success.
func emitConsiders(executionsID, originID, secret string, provErr error) { // When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT
// instead of routing through PROPALGATION_EVENT.
func (m *MinioSetter) emitConsiders(executionsID, originID, secret string, provErr error, self bool) {
fmt.Println("emitConsiders !")
var errStr *string var errStr *string
if provErr != nil { if provErr != nil {
s := provErr.Error() s := provErr.Error()
@@ -56,6 +71,15 @@ func emitConsiders(executionsID, originID, secret string, provErr error) {
Secret: secret, Secret: secret,
Error: errStr, Error: errStr,
}) })
if self {
go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: tools.STORAGE_RESOURCE,
Method: int(tools.CONSIDERS_EVENT),
Payload: payload,
})
return
}
b, _ := json.Marshal(&tools.PropalgationMessage{ b, _ := json.Marshal(&tools.PropalgationMessage{
DataType: tools.STORAGE_RESOURCE.EnumIndex(), DataType: tools.STORAGE_RESOURCE.EnumIndex(),
Action: tools.PB_CONSIDERS, Action: tools.PB_CONSIDERS,
@@ -69,16 +93,6 @@ func emitConsiders(executionsID, originID, secret string, provErr error) {
}) })
} }
// MinioSetter carries the execution context for a Minio credential provisioning.
type MinioSetter struct {
ExecutionsID string // used as both the bucket name and the K8s namespace suffix
MinioID string // ID of the Minio storage resource
}
func NewMinioSetter(execID, minioID string) *MinioSetter {
return &MinioSetter{ExecutionsID: execID, MinioID: minioID}
}
// InitializeAsSource is called on the peer that hosts the Minio instance. // InitializeAsSource is called on the peer that hosts the Minio instance.
// //
// It: // It:
@@ -88,7 +102,7 @@ func NewMinioSetter(execID, minioID string) *MinioSetter {
// 4. If source and dest are the same peer, calls InitializeAsTarget directly. // 4. If source and dest are the same peer, calls InitializeAsTarget directly.
// Otherwise, publishes a MinioCredentialEvent via NATS (Phase 2) so that // Otherwise, publishes a MinioCredentialEvent via NATS (Phase 2) so that
// oc-discovery can route the credentials to the compute peer. // oc-discovery can route the credentials to the compute peer.
func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destPeerID, originID string) { func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destPeerID, originID string, self bool) {
logger := oclib.GetLogger() logger := oclib.GetLogger()
url, err := m.loadMinioURL(localPeerID) url, err := m.loadMinioURL(localPeerID)
@@ -128,7 +142,7 @@ func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destP
if destPeerID == localPeerID { if destPeerID == localPeerID {
// Same peer: store the secret locally without going through NATS. // Same peer: store the secret locally without going through NATS.
m.InitializeAsTarget(ctx, event) m.InitializeAsTarget(ctx, event, true)
return return
} }
@@ -138,7 +152,6 @@ func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destP
logger.Error().Msg("MinioSetter.InitializeAsSource: failed to marshal credential event: " + err.Error()) logger.Error().Msg("MinioSetter.InitializeAsSource: failed to marshal credential event: " + err.Error())
return return
} }
if b, err := json.Marshal(&tools.PropalgationMessage{ if b, err := json.Marshal(&tools.PropalgationMessage{
DataType: -1, DataType: -1,
Action: tools.PB_MINIO_CONFIG, Action: tools.PB_MINIO_CONFIG,
@@ -146,20 +159,23 @@ func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destP
}); err == nil { }); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter", FromApp: "oc-datacenter",
Datatype: -1, Datatype: tools.STORAGE_RESOURCE,
User: "", User: "",
Method: int(tools.PROPALGATION_EVENT), Method: int(tools.PROPALGATION_EVENT),
Payload: b, Payload: b,
}) })
logger.Info().Msg("MinioSetter.InitializeAsSource: credentials published via NATS for " + m.ExecutionsID) logger.Info().Msg("MinioSetter.InitializeAsSource: credentials published via NATS for " + m.ExecutionsID)
} }
} }
// InitializeAsTarget is called on the peer that runs the compute workload. // InitializeAsTarget is called on the peer that runs the compute workload.
// //
// It stores the Minio credentials received from the source peer (via NATS or directly) // It stores the Minio credentials received from the source peer (via NATS or directly)
// as a Kubernetes secret inside the execution namespace, making them available to pods. // as a Kubernetes secret inside the execution namespace, making them available to pods.
func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredentialEvent) { // self must be true when the origin peer is the local peer (direct CONSIDERS_EVENT emission).
func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredentialEvent, self bool) {
fmt.Println("InitializeAsTarget is Self :", self)
logger := oclib.GetLogger() logger := oclib.GetLogger()
k, err := tools.NewKubernetesService( k, err := tools.NewKubernetesService(
@@ -173,18 +189,18 @@ func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredent
if err := k.CreateSecret(ctx, event.MinioID, event.ExecutionsID, event.Access, event.Secret); err != nil { if err := k.CreateSecret(ctx, event.MinioID, event.ExecutionsID, event.Access, event.Secret); err != nil {
logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create k8s secret: " + err.Error()) logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create k8s secret: " + err.Error())
emitConsiders(event.ExecutionsID, event.OriginID, "", err) m.emitConsiders(event.ExecutionsID, event.OriginID, "", err, self)
return return
} }
if err := NewMinioService(event.URL).CreateMinioConfigMap(event.MinioID, event.ExecutionsID, event.URL); err == nil { if err := NewMinioService(event.URL).CreateMinioConfigMap(event.MinioID, event.ExecutionsID, event.URL); err != nil {
logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create config map: " + err.Error()) logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create config map: " + err.Error())
emitConsiders(event.ExecutionsID, event.OriginID, "", err) m.emitConsiders(event.ExecutionsID, event.OriginID, "", err, self)
return return
} }
logger.Info().Msg("MinioSetter.InitializeAsTarget: Minio credentials stored in namespace " + event.ExecutionsID) logger.Info().Msg("MinioSetter.InitializeAsTarget: Minio credentials stored in namespace " + event.ExecutionsID)
emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil) m.emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil, self)
} }
// MinioDeleteEvent is the NATS payload used to tear down Minio resources. // MinioDeleteEvent is the NATS payload used to tear down Minio resources.
@@ -213,7 +229,7 @@ func (m *MinioSetter) TeardownAsTarget(ctx context.Context, event MinioDeleteEve
) )
if err != nil { if err != nil {
logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to create k8s service: " + err.Error()) logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to create k8s service: " + err.Error())
emitConsiders(event.ExecutionsID, event.OriginID, "", err) m.emitConsiders(event.ExecutionsID, event.OriginID, "", err, event.SourcePeerID == event.DestPeerID)
return return
} }
@@ -295,3 +311,52 @@ func (m *MinioSetter) loadMinioURL(peerID string) (string, error) {
} }
return "", fmt.Errorf("loadMinioURL: no live storage found for minio ID %s", m.MinioID) return "", fmt.Errorf("loadMinioURL: no live storage found for minio ID %s", m.MinioID)
} }
// teardownMinioForExecution tears down all Minio configuration for the execution:
// - storage bookings where this peer is the compute target → TeardownAsTarget
// - storage bookings where this peer is the Minio source → TeardownAsSource
func (m *MinioSetter) TeardownForExecution(ctx context.Context, localPeerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: m.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
return
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
if b.DestPeerID == localPeerID {
// This peer is the compute target: tear down K8s secret + configmap.
logger.Info().Msgf("InfraTeardown: Minio target teardown exec=%s storage=%s", m.ExecutionsID, b.ResourceID)
event := MinioDeleteEvent{
ExecutionsID: m.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: b.DestPeerID,
DestPeerID: localPeerID,
OriginID: "",
}
m.TeardownAsTarget(ctx, event)
} else {
// This peer is the Minio source: revoke SA + remove execution bucket.
logger.Info().Msgf("InfraTeardown: Minio source teardown exec=%s storage=%s", m.ExecutionsID, b.ResourceID)
event := MinioDeleteEvent{
ExecutionsID: m.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: localPeerID,
DestPeerID: b.DestPeerID,
OriginID: "",
}
m.TeardownAsSource(ctx, event)
}
}
}

View File

@@ -0,0 +1,230 @@
package storage
import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/tools"
)
// PVCProvisionEvent is the NATS payload for local PVC provisioning.
// Same-peer deployments are handled directly; cross-peer routes via PB_PVC_CONFIG.
type PVCProvisionEvent struct {
ExecutionsID string `json:"executions_id"`
StorageID string `json:"storage_id"`
StorageName string `json:"storage_name"`
SourcePeerID string `json:"source_peer_id"`
DestPeerID string `json:"dest_peer_id"`
OriginID string `json:"origin_id"`
}
// PVCDeleteEvent is the NATS payload for local PVC teardown.
type PVCDeleteEvent struct {
ExecutionsID string `json:"executions_id"`
StorageID string `json:"storage_id"`
StorageName string `json:"storage_name"`
SourcePeerID string `json:"source_peer_id"`
DestPeerID string `json:"dest_peer_id"`
OriginID string `json:"origin_id"`
}
// ClaimName returns the deterministic PVC name shared by oc-datacenter and oc-monitord.
func ClaimName(storageName, executionsID string) string {
return strings.ReplaceAll(strings.ToLower(storageName), " ", "-") + "-" + executionsID
}
// PVCSetter carries the execution context for a local PVC provisioning.
type PVCSetter struct {
ExecutionsID string
StorageID string
// ClaimSuffix overrides ExecutionsID as the suffix in ClaimName when non-empty.
// Used when the PVC namespace differs from the claim name suffix (Admiralty target).
ClaimSuffix string
}
func NewPVCSetter(execID, storageID string) *PVCSetter {
return &PVCSetter{ExecutionsID: execID, StorageID: storageID}
}
// NewPVCSetterWithClaimSuffix creates a PVCSetter where the claim name suffix
// differs from the execution namespace (e.g. Admiralty target provisioning).
func NewPVCSetterWithClaimSuffix(storageID, claimSuffix string) *PVCSetter {
return &PVCSetter{StorageID: storageID, ClaimSuffix: claimSuffix}
}
func (p *PVCSetter) emitConsiders(executionsID, originID string, provErr error, self bool) {
type pvcConsidersPayload struct {
OriginID string `json:"origin_id"`
ExecutionsID string `json:"executions_id"`
Error *string `json:"error,omitempty"`
}
var errStr *string
if provErr != nil {
s := provErr.Error()
errStr = &s
}
payload, _ := json.Marshal(pvcConsidersPayload{
OriginID: originID,
ExecutionsID: executionsID,
Error: errStr,
})
if self {
go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: tools.STORAGE_RESOURCE,
Method: int(tools.CONSIDERS_EVENT),
Payload: payload,
})
return
}
b, _ := json.Marshal(&tools.PropalgationMessage{
DataType: tools.STORAGE_RESOURCE.EnumIndex(),
Action: tools.PB_CONSIDERS,
Payload: payload,
})
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
// InitializeAsSource creates the PVC in the execution namespace on the local cluster.
// self must be true when source and dest are the same peer (direct CONSIDERS_EVENT emission).
func (p *PVCSetter) InitializeAsSource(ctx context.Context, event PVCProvisionEvent, self bool) {
logger := oclib.GetLogger()
sizeStr, err := p.loadStorageSize(event.SourcePeerID)
if err != nil {
logger.Error().Msg("PVCSetter.InitializeAsSource: " + err.Error())
p.emitConsiders(event.ExecutionsID, event.OriginID, err, self)
return
}
k, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData,
)
if err != nil {
logger.Error().Msg("PVCSetter.InitializeAsSource: failed to create k8s service: " + err.Error())
p.emitConsiders(event.ExecutionsID, event.OriginID, err, self)
return
}
claimSuffix := event.ExecutionsID
if p.ClaimSuffix != "" {
claimSuffix = p.ClaimSuffix
}
claimName := ClaimName(event.StorageName, claimSuffix)
if err := k.CreatePVC(ctx, claimName, event.ExecutionsID, sizeStr); err != nil {
logger.Error().Msg("PVCSetter.InitializeAsSource: failed to create PVC: " + err.Error())
p.emitConsiders(event.ExecutionsID, event.OriginID, err, self)
return
}
logger.Info().Msg("PVCSetter.InitializeAsSource: PVC " + claimName + " created in " + event.ExecutionsID)
p.emitConsiders(event.ExecutionsID, event.OriginID, nil, self)
}
// TeardownAsSource deletes the PVC from the execution namespace.
func (p *PVCSetter) TeardownAsSource(ctx context.Context, event PVCDeleteEvent) {
logger := oclib.GetLogger()
k, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData,
)
if err != nil {
logger.Error().Msg("PVCSetter.TeardownAsSource: failed to create k8s service: " + err.Error())
return
}
claimName := ClaimName(event.StorageName, event.ExecutionsID)
if err := k.DeletePVC(ctx, claimName, event.ExecutionsID); err != nil {
logger.Error().Msg("PVCSetter.TeardownAsSource: failed to delete PVC: " + err.Error())
return
}
logger.Info().Msg("PVCSetter.TeardownAsSource: PVC " + claimName + " deleted from " + event.ExecutionsID)
}
// ResolveStorageName returns the live storage name for a given storageID, or "" if not found.
func ResolveStorageName(storageID, peerID string) string {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false)
if res.Err != "" {
return ""
}
for _, dbo := range res.Data {
l := dbo.(*live.LiveStorage)
if slices.Contains(l.ResourcesID, storageID) {
return l.GetName()
}
}
return ""
}
// loadStorageSize looks up the SizeGB for this storage in live storages.
func (p *PVCSetter) loadStorageSize(peerID string) (string, error) {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false)
if res.Err != "" {
return "", fmt.Errorf("loadStorageSize: %s", res.Err)
}
for _, dbo := range res.Data {
l := dbo.(*live.LiveStorage)
if slices.Contains(l.ResourcesID, p.StorageID) && l.SizeGB > 0 {
return fmt.Sprintf("%dGi", l.SizeGB), nil
}
}
return "10Gi", nil
}
// teardownPVCForExecution deletes all local PVCs provisioned for the execution.
// It searches LIVE_STORAGE bookings and resolves the storage name via the live storage.
func (p *PVCSetter) TeardownForExecution(ctx context.Context, localPeerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: p.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
return
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
// Resolve storage name from live storage to compute the claim name.
storageName := ResolveStorageName(b.ResourceID, localPeerID)
if storageName == "" {
continue
}
logger.Info().Msgf("InfraTeardown: PVC teardown exec=%s storage=%s", p.ExecutionsID, b.ResourceID)
event := PVCDeleteEvent{
ExecutionsID: p.ExecutionsID,
StorageID: b.ResourceID,
StorageName: storageName,
SourcePeerID: localPeerID,
DestPeerID: b.DestPeerID,
OriginID: "",
}
p.StorageID = b.ResourceID
p.TeardownAsSource(ctx, event)
}
}

23
main.go
View File

@@ -1,11 +1,9 @@
package main package main
import ( import (
"encoding/base64"
"oc-datacenter/conf" "oc-datacenter/conf"
"oc-datacenter/infrastructure" "oc-datacenter/infrastructure"
_ "oc-datacenter/routers" _ "oc-datacenter/routers"
"os"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
@@ -17,29 +15,24 @@ func main() {
// Load the right config file // Load the right config file
o := oclib.GetConfLoader(appname) o := oclib.GetConfLoader(appname)
conf.GetConfig().Mode = o.GetStringDefault("MODE", "kubernetes") conf.GetConfig().Mode = o.GetStringDefault("MODE", "kubernetes")
conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", os.Getenv("KUBERNETES_SERVICE_HOST")) conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc.cluster.local")
conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443") conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443")
conf.GetConfig().KubeExternalHost = o.GetStringDefault("KUBE_EXTERNAL_HOST", "")
sDec, err := base64.StdEncoding.DecodeString(o.GetStringDefault("KUBE_CA", "")) conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpNeE1qY3dPVFl3SGhjTk1qWXdNekV3TURjeE9ERTJXaGNOTXpZd016QTNNRGN4T0RFMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpNeE1qY3dPVFl3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFReG81cXQ0MGxEekczRHJKTE1wRVBrd0ZBY1FmbC8vVE1iWjZzemMreHAKbmVzVzRTSTdXK1lWdFpRYklmV2xBMTRaazQvRFlDMHc1YlgxZU94RVVuL0pvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVXBLM2pGK25IRlZSbDcwb3ZRVGZnCmZabGNQZE13Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnVnkyaUx0Y0xaYm1vTnVoVHdKbU5sWlo3RVlBYjJKNW0KSjJYbG1UbVF5a2tDSUhLbzczaDBkdEtUZTlSa0NXYTJNdStkS1FzOXRFU0tBV0x1emlnYXBHYysKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=")
if err == nil { conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrakNDQVRlZ0F3SUJBZ0lJQUkvSUg2R2Rodm93Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOemN6TVRJM01EazJNQjRYRFRJMk1ETXhNREEzTVRneE5sb1hEVEkzTURNeApNREEzTVRneE5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJQTTdBVEZQSmFMMjUrdzAKUU1vZUIxV2hBRW4vWnViM0tSRERrYnowOFhwQWJ2akVpdmdnTkdpdG4wVmVsaEZHamRmNHpBT29Nd1J3M21kbgpYSGtHVDB5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUVZLOThaMEMxcFFyVFJSMGVLZHhIa2o0ejFJREFLQmdncWhrak9QUVFEQWdOSkFEQkcKQWlFQXZYWll6Zk9iSUtlWTRtclNsRmt4ZS80a0E4K01ieDc1UDFKRmNlRS8xdGNDSVFDNnM0ZXlZclhQYmNWSgpxZm5EamkrZ1RacGttN0tWSTZTYTlZN2FSRGFabUE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCi0tLS0tQkVHSU4gQ0VSVElGSUNBVEUtLS0tLQpNSUlCZURDQ0FSMmdBd0lCQWdJQkFEQUtCZ2dxaGtqT1BRUURBakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwClpXNTBMV05oUURFM056TXhNamN3T1RZd0hoY05Nall3TXpFd01EY3hPREUyV2hjTk16WXdNekEzTURjeE9ERTIKV2pBak1TRXdId1lEVlFRRERCaHJNM010WTJ4cFpXNTBMV05oUURFM056TXhNamN3T1RZd1dUQVRCZ2NxaGtqTwpQUUlCQmdncWhrak9QUU1CQndOQ0FBUzV1NGVJbStvVnV1SFI0aTZIOU1kVzlyUHdJbFVPNFhIMEJWaDRUTGNlCkNkMnRBbFVXUW5FakxMdlpDWlVaYTlzTlhKOUVtWWt5S0dtQWR2TE9FbUVrbzBJd1FEQU9CZ05WSFE4QkFmOEUKQkFNQ0FxUXdEd1lEVlIwVEFRSC9CQVV3QXdFQi96QWRCZ05WSFE0RUZnUVVGU3ZmR2RBdGFVSzAwVWRIaW5jUgo1SStNOVNBd0NnWUlLb1pJemowRUF3SURTUUF3UmdJaEFMY2xtQnR4TnpSVlBvV2hoVEVKSkM1Z3VNSGsvcFZpCjFvYXJ2UVJxTWRKcUFpRUEyR1dNTzlhZFFYTEQwbFZKdHZMVkc1M3I0M0lxMHpEUUQwbTExMVZyL1MwPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==")
conf.GetConfig().KubeCA = string(sDec) conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVkSTRZN3lRU1ZwRGNrblhsQmJEaXBWZHRMWEVsYVBkN3VBZHdBWFFya2xvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFOHpzQk1VOGxvdmJuN0RSQXloNEhWYUVBU2Y5bTV2Y3BFTU9SdlBUeGVrQnUrTVNLK0NBMAphSzJmUlY2V0VVYU4xL2pNQTZnekJIRGVaMmRjZVFaUFRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=")
}
sDec, err = base64.StdEncoding.DecodeString(o.GetStringDefault("KUBE_CERT", ""))
if err == nil {
conf.GetConfig().KubeCert = string(sDec)
}
sDec, err = base64.StdEncoding.DecodeString(o.GetStringDefault("KUBE_DATA", ""))
if err == nil {
conf.GetConfig().KubeData = string(sDec)
}
conf.GetConfig().MonitorMode = o.GetStringDefault("MONITOR_MODE", "prometheus") conf.GetConfig().MonitorMode = o.GetStringDefault("MONITOR_MODE", "prometheus")
conf.GetConfig().MinioRootKey = o.GetStringDefault("MINIO_ADMIN_ACCESS", "") conf.GetConfig().MinioRootKey = o.GetStringDefault("MINIO_ADMIN_ACCESS", "")
conf.GetConfig().MinioRootSecret = o.GetStringDefault("MINIO_ADMIN_SECRET", "") conf.GetConfig().MinioRootSecret = o.GetStringDefault("MINIO_ADMIN_SECRET", "")
oclib.InitAPI(appname) oclib.InitAPI(appname)
infrastructure.BootstrapAllowedImages()
go infrastructure.ListenNATS() go infrastructure.ListenNATS()
go infrastructure.WatchBookings() go infrastructure.WatchBookings()
go infrastructure.WatchInfra()
beego.Run() beego.Run()
} }

View File

@@ -29,6 +29,11 @@ func init() {
&controllers.VersionController{}, &controllers.VersionController{},
), ),
), ),
beego.NSNamespace("/allowed-image",
beego.NSInclude(
&controllers.AllowedImageController{},
),
),
) )
beego.AddNamespace(ns) beego.AddNamespace(ns)