From c87245e83f3f3d99c1cedb556f5c864d88cdbac4 Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 25 Mar 2026 11:11:03 +0100 Subject: [PATCH] Oc-Datacenter Allowed Resource And Prepull Images For Efficient process --- controllers/allowed_image.go | 96 ++++++ go.mod | 2 +- go.sum | 4 + infrastructure/{ => admiralty}/admiralty.go | 158 ++++++--- infrastructure/allowed_image_bootstrap.go | 39 +++ infrastructure/booking_watchdog.go | 132 +------ infrastructure/kubernetes/kubernetes.go | 323 ++++++++++++++++++ .../kubernetes/models}/kubeconfig.go | 22 +- .../watchdog.go} | 64 +++- infrastructure/namespace.go | 28 -- infrastructure/{ => nats}/nats.go | 100 +++--- infrastructure/{minio => storage}/minio.go | 2 +- .../{minio => storage}/minio_setter.go | 83 ++++- infrastructure/storage/pvc_setter.go | 68 +++- main.go | 2 + routers/router.go | 5 + 16 files changed, 836 insertions(+), 292 deletions(-) create mode 100644 controllers/allowed_image.go rename infrastructure/{ => admiralty}/admiralty.go (73%) create mode 100644 infrastructure/allowed_image_bootstrap.go create mode 100644 infrastructure/kubernetes/kubernetes.go rename {models => infrastructure/kubernetes/models}/kubeconfig.go (72%) rename infrastructure/{infra_watchdog.go => kubernetes/watchdog.go} (83%) delete mode 100644 infrastructure/namespace.go rename infrastructure/{ => nats}/nats.go (78%) rename infrastructure/{minio => storage}/minio.go (99%) rename infrastructure/{minio => storage}/minio_setter.go (83%) diff --git a/controllers/allowed_image.go b/controllers/allowed_image.go new file mode 100644 index 0000000..22e443a --- /dev/null +++ b/controllers/allowed_image.go @@ -0,0 +1,96 @@ +package controllers + +import ( + "encoding/json" + "slices" + + oclib "cloud.o-forge.io/core/oc-lib" + beego "github.com/beego/beego/v2/server/web" + + "cloud.o-forge.io/core/oc-lib/models/allowed_image" +) + +// AllowedImageController gère la liste locale des images autorisées à persister +// sur ce peer après l'exécution d'un workflow. +// +// GET /allowed-image/ → tous les utilisateurs authentifiés +// GET /allowed-image/:id → tous les utilisateurs authentifiés +// POST /allowed-image/ → peer admin uniquement +// DELETE /allowed-image/:id → peer admin uniquement (bloqué si IsDefault) +type AllowedImageController struct { + beego.Controller +} + +// isAdmin vérifie que l'appelant est peer admin (groupe "admin" dans le token JWT). +func isAdmin(groups []string) bool { + return slices.Contains(groups, "admin") +} + +// @Title GetAll +// @Description Retourne toutes les images autorisées à persister sur ce peer +// @Success 200 {object} []allowed_image.AllowedImage +// @router / [get] +func (o *AllowedImageController) GetAll() { + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).LoadAll(false) + o.Data["json"] = res + o.ServeJSON() +} + +// @Title Get +// @Description Retourne une image autorisée par son ID +// @Param id path string true "ID de l'image autorisée" +// @Success 200 {object} allowed_image.AllowedImage +// @router /:id [get] +func (o *AllowedImageController) Get() { + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + id := o.Ctx.Input.Param(":id") + res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).LoadOne(id) + o.Data["json"] = res + o.ServeJSON() +} + +// @Title Post +// @Description Ajoute une image à la liste des images autorisées (peer admin uniquement) +// @Param body body allowed_image.AllowedImage true "Image à autoriser" +// @Success 200 {object} allowed_image.AllowedImage +// @router / [post] +func (o *AllowedImageController) Post() { + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + if !isAdmin(groups) { + o.Ctx.Output.SetStatus(403) + o.Data["json"] = map[string]string{"err": "peer admin required"} + o.ServeJSON() + return + } + var img allowed_image.AllowedImage + if err := json.Unmarshal(o.Ctx.Input.RequestBody, &img); err != nil { + o.Ctx.Output.SetStatus(400) + o.Data["json"] = map[string]string{"err": err.Error()} + o.ServeJSON() + return + } + img.IsDefault = false // l'opérateur ne peut pas créer d'entrées bootstrap via API + res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).StoreOne(img.Serialize(&img)) + o.Data["json"] = res + o.ServeJSON() +} + +// @Title Delete +// @Description Supprime une image de la liste des images autorisées (peer admin uniquement, entrées bootstrap non supprimables) +// @Param id path string true "ID de l'image autorisée" +// @Success 200 {object} allowed_image.AllowedImage +// @router /:id [delete] +func (o *AllowedImageController) Delete() { + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + if !isAdmin(groups) { + o.Ctx.Output.SetStatus(403) + o.Data["json"] = map[string]string{"err": "peer admin required"} + o.ServeJSON() + return + } + id := o.Ctx.Input.Param(":id") + res := oclib.NewRequest(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), user, peerID, groups, nil).DeleteOne(id) + o.Data["json"] = res + o.ServeJSON() +} diff --git a/go.mod b/go.mod index f4306be..e31d6ef 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module oc-datacenter go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46 github.com/beego/beego/v2 v2.3.8 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/minio/madmin-go/v4 v4.1.1 @@ -16,6 +15,7 @@ require ( ) require ( + cloud.o-forge.io/core/oc-lib v0.0.0-20260325092016-4580200e8057 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/biter777/countries v1.7.5 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index 2785808..c436ecf 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260323112935-b76b22a8fbee h1:XQ85OdhYry8zo cloud.o-forge.io/core/oc-lib v0.0.0-20260323112935-b76b22a8fbee/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46 h1:71WVrnLj0SM6PfQxCh25b2JGcL/1MZ2lYt254R/8n28= cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIyvapWwHZ8t20wMaSaMU6NoZLkMCui8w= +cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260325092016-4580200e8057 h1:pR+lZzcCWZ0kke2r2xXa7OpdbLpPW3gZSWZ8gGHh274= +cloud.o-forge.io/core/oc-lib v0.0.0-20260325092016-4580200e8057/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/infrastructure/admiralty.go b/infrastructure/admiralty/admiralty.go similarity index 73% rename from infrastructure/admiralty.go rename to infrastructure/admiralty/admiralty.go index 91a3466..4224418 100644 --- a/infrastructure/admiralty.go +++ b/infrastructure/admiralty/admiralty.go @@ -1,4 +1,4 @@ -package infrastructure +package admiralty import ( "context" @@ -11,10 +11,14 @@ import ( "time" "oc-datacenter/conf" + "oc-datacenter/infrastructure/kubernetes/models" "oc-datacenter/infrastructure/monitor" - "oc-datacenter/models" + "oc-datacenter/infrastructure/storage" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -23,30 +27,23 @@ import ( // kubeconfigChannels holds channels waiting for kubeconfig delivery (keyed by executionID). var kubeconfigChannels sync.Map -// kubeconfigEvent is the NATS payload used to transfer the kubeconfig from the source peer to the target peer. -type KubeconfigEvent struct { - DestPeerID string `json:"dest_peer_id"` - ExecutionsID string `json:"executions_id"` - Kubeconfig string `json:"kubeconfig"` - SourcePeerID string `json:"source_peer_id"` - // OriginID is the peer that initiated the provisioning request. - // The PB_CONSIDERS response is routed back to this peer. - OriginID string `json:"origin_id"` -} - // admiraltyConsidersPayload is the PB_CONSIDERS payload emitted after admiralty provisioning. type admiraltyConsidersPayload struct { - OriginID string `json:"origin_id"` - ExecutionsID string `json:"executions_id"` - Secret string `json:"secret,omitempty"` - Error *string `json:"error,omitempty"` + OriginID string `json:"origin_id"` + ExecutionsID string `json:"executions_id"` + // PeerID is the compute peer (SourcePeerID of the original ArgoKubeEvent). + // oc-monitord uses it to build a unique considers key per peer, avoiding + // broadcast collisions when multiple compute peers run in parallel. + PeerID string `json:"peer_id,omitempty"` + Secret string `json:"secret,omitempty"` + Error *string `json:"error,omitempty"` } // emitAdmiraltyConsiders publishes a PB_CONSIDERS back to OriginID with the result // of the admiralty provisioning. secret is the base64-encoded kubeconfig; err is nil on success. // When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT // instead of routing through PROPALGATION_EVENT. -func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error, self bool) { +func emitAdmiraltyConsiders(executionsID, originID, peerID, secret string, provErr error, self bool) { var errStr *string if provErr != nil { s := provErr.Error() @@ -55,6 +52,7 @@ func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error payload, _ := json.Marshal(admiraltyConsidersPayload{ OriginID: originID, ExecutionsID: executionsID, + PeerID: peerID, Secret: secret, Error: errStr, }) @@ -95,9 +93,15 @@ func NewAdmiraltySetter(execIDS string) *AdmiraltySetter { // InitializeAsSource is called on the peer that acts as the SOURCE cluster (compute provider). // It creates the AdmiraltySource resource, generates a kubeconfig for the target peer, // and publishes it on NATS so the target peer can complete its side of the setup. -func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string, self bool) error { +func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string, self bool, images []string) error { logger := oclib.GetLogger() + // Local execution: no Admiralty resources needed — just emit PB_CONSIDERS. + if localPeerID == destPeerID { + emitAdmiraltyConsiders(s.ExecutionsID, originID, localPeerID, "", nil, true) + return nil + } + serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { @@ -127,17 +131,14 @@ func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID st return errors.New("InitializeAsSource: failed to marshal kubeconfig: " + err.Error()) } encodedKubeconfig := base64.StdEncoding.EncodeToString(b) - kube := KubeconfigEvent{ - ExecutionsID: s.ExecutionsID, - Kubeconfig: encodedKubeconfig, - SourcePeerID: localPeerID, - DestPeerID: destPeerID, - OriginID: originID, - } - if destPeerID == localPeerID { - // Self case: source and target are the same cluster, no Admiralty target to configure. - emitAdmiraltyConsiders(s.ExecutionsID, originID, encodedKubeconfig, nil, true) - return nil + kube := models.KubeconfigEvent{ + ExecutionsID: s.ExecutionsID, + Kubeconfig: encodedKubeconfig, + SourcePeerID: localPeerID, + DestPeerID: destPeerID, + OriginID: originID, + SourceExecutionsID: s.ExecutionsID, + Images: images, } // Publish the kubeconfig on NATS so the target peer can proceed payload, err := json.Marshal(kube) @@ -167,7 +168,7 @@ func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID st // It waits for the kubeconfig published by the source peer via NATS, then creates // the Secret, AdmiraltyTarget, and polls until the virtual node appears. // self must be true when the origin peer is the local peer (direct CONSIDERS_EVENT emission). -func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj KubeconfigEvent, self bool) { +func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj models.KubeconfigEvent, self bool) { logger := oclib.GetLogger() defer kubeconfigChannels.Delete(s.ExecutionsID) @@ -185,7 +186,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj logger.Info().Msg("InitializeAsTarget: creating Namespace " + s.ExecutionsID) if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create namespace: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } @@ -193,7 +194,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj logger.Info().Msg("InitializeAsTarget: creating ServiceAccount sa-" + s.ExecutionsID) if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create service account: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } @@ -215,7 +216,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj {"patch"}}, ); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create role: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } @@ -224,7 +225,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj logger.Info().Msg("InitializeAsTarget: creating RoleBinding " + rbName) if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create role binding: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } @@ -232,7 +233,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj logger.Info().Msg("InitializeAsTarget: creating Secret ns-" + s.ExecutionsID) if _, err := serv.CreateKubeconfigSecret(ctx, kubeconfigData, s.ExecutionsID, kubeconfigObj.SourcePeerID); err != nil { logger.Error().Msg("InitializeAsTarget: failed to create kubeconfig secret: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } @@ -244,14 +245,63 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj if err == nil { err = fmt.Errorf("CreateAdmiraltyTarget returned nil response") } - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } + // 5. Provision PVCs in the target namespace so Admiralty shadow pods can mount them. + // The claim names must match what oc-monitord generates: {storageName}-{sourceExecutionsID}. + if kubeconfigObj.SourceExecutionsID != "" { + logger.Info().Msg("InitializeAsTarget: provisioning PVCs for source exec " + kubeconfigObj.SourceExecutionsID) + provisionPVCsForTarget(ctx, s.ExecutionsID, kubeconfigObj.SourceExecutionsID, kubeconfigObj.SourcePeerID) + } + // Poll until the virtual node appears (inlined from GetNodeReady controller) logger.Info().Msg("InitializeAsTarget: waiting for virtual node ns-" + s.ExecutionsID) s.waitForNode(ctx, serv, kubeconfigObj.SourcePeerID) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigData, nil, self) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, kubeconfigData, nil, self) +} + +// provisionPVCsForTarget creates PVCs in the Admiralty target namespace for all local +// storages booked under sourceExecutionsID. The claim names use sourceExecutionsID as +// suffix so they match what oc-monitord generates in the workflow spec. +func provisionPVCsForTarget(ctx context.Context, targetNS string, sourceExecutionsID string, peerID string) { + logger := oclib.GetLogger() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "executions_id": {{Operator: dbs.EQUAL.String(), Value: sourceExecutionsID}}, + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" || len(res.Data) == 0 { + return + } + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok { + continue + } + storageName := storage.ResolveStorageName(b.ResourceID, peerID) + if storageName == "" { + continue + } + event := storage.PVCProvisionEvent{ + ExecutionsID: targetNS, + StorageID: b.ResourceID, + StorageName: storageName, + SourcePeerID: peerID, + DestPeerID: peerID, + OriginID: peerID, + } + // Use sourceExecutionsID as claim name suffix so it matches oc-monitord's claimName. + setter := storage.NewPVCSetterWithClaimSuffix(b.ResourceID, sourceExecutionsID) + logger.Info().Msgf("InitializeAsTarget: provisioning PVC %s in ns %s", storage.ClaimName(storageName, sourceExecutionsID), targetNS) + setter.InitializeAsSource(ctx, event, true) + } } // waitForNode polls GetOneNode until the Admiralty virtual node appears on this cluster. @@ -361,3 +411,35 @@ func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) { }}, }, nil } + +// teardownAdmiraltyIfRemote triggers Admiralty TeardownAsTarget only when at +// least one compute booking for the execution is on a remote peer. +// Local executions do not involve Admiralty. +func (s *AdmiraltySetter) TeardownIfRemote(exec *workflow_execution.WorkflowExecution, selfPeerID string) { + logger := oclib.GetLogger() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", selfPeerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}}, + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" || len(res.Data) == 0 { + return + } + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok { + continue + } + if b.DestPeerID != selfPeerID { + logger.Info().Msgf("InfraTeardown: Admiralty teardown exec=%s (remote peer=%s)", + exec.ExecutionsID, b.DestPeerID) + s.TeardownAsTarget(context.Background(), selfPeerID) + return // one teardown per execution is enough + } + } +} diff --git a/infrastructure/allowed_image_bootstrap.go b/infrastructure/allowed_image_bootstrap.go new file mode 100644 index 0000000..f27d4f7 --- /dev/null +++ b/infrastructure/allowed_image_bootstrap.go @@ -0,0 +1,39 @@ +package infrastructure + +import ( + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/allowed_image" +) + +// defaultAllowedImages est la liste des images utilitaires légères autorisées +// à persister sur tous les peers sans action de l'opérateur. +// +// Ces entrées sont marquées IsDefault:true et ne peuvent pas être supprimées +// via l'API — elles sont sous contrôle exclusif du code de la plateforme. +var defaultAllowedImages = []allowed_image.AllowedImage{ + {Image: "natsio/nats-box", TagConstraint: "", IsDefault: true}, // outil NATS utilisé par les native tools + {Image: "library/alpine", TagConstraint: "", IsDefault: true}, // base image légère standard + {Image: "library/busybox", TagConstraint: "", IsDefault: true}, // utilitaire shell minimal +} + +// BootstrapAllowedImages insère les images par défaut si elles sont absentes +// en base. Les entrées existantes ne sont pas modifiées. +// À appeler une fois au démarrage, avant beego.Run(). +func BootstrapAllowedImages() { + req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), nil) + + for _, img := range defaultAllowedImages { + // Vérifie si une entrée avec ce nom d'image existe déjà. + existing := req.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "image": {{Operator: dbs.EQUAL.String(), Value: img.Image}}, + }, + }, "", false) + if existing.Err != "" || len(existing.Data) > 0 { + continue // déjà présente ou erreur de recherche : on passe + } + local := img // copie pour éviter la capture de boucle + req.StoreOne(local.Serialize(&local)) + } +} diff --git a/infrastructure/booking_watchdog.go b/infrastructure/booking_watchdog.go index 92ab1cc..e54e84c 100644 --- a/infrastructure/booking_watchdog.go +++ b/infrastructure/booking_watchdog.go @@ -1,20 +1,15 @@ package infrastructure import ( - "context" "encoding/json" "fmt" "sync" "time" - "oc-datacenter/infrastructure/minio" - "oc-datacenter/infrastructure/storage" - oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" - "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -23,7 +18,7 @@ import ( var processedBookings sync.Map // closingStates is the set of terminal booking states. -var closingStates = map[enum.BookingStatus]bool{ +var ClosingStates = map[enum.BookingStatus]bool{ enum.FAILURE: true, enum.SUCCESS: true, enum.FORGOTTEN: true, @@ -92,7 +87,7 @@ func emitWatchdogFailure(b *bookingmodel.Booking) { if _, done := processedBookings.Load(b.GetID()); done { return } - if closingStates[b.State] { + if ClosingStates[b.State] { processedBookings.Store(b.GetID(), struct{}{}) return } @@ -115,126 +110,3 @@ func emitWatchdogFailure(b *bookingmodel.Booking) { logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID()) processedBookings.Store(b.GetID(), struct{}{}) } - -// ── Infra teardown helpers (called from nats.go on WORKFLOW_DONE_EVENT) ──────── - -// teardownAdmiraltyIfRemote triggers Admiralty TeardownAsTarget only when at -// least one compute booking for the execution is on a remote peer. -// Local executions do not involve Admiralty. -func teardownAdmiraltyIfRemote(exec *workflow_execution.WorkflowExecution, selfPeerID string) { - logger := oclib.GetLogger() - - res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", selfPeerID, []string{}, nil). - Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ - "executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}}, - "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}}, - }, - }, "", false) - - if res.Err != "" || len(res.Data) == 0 { - return - } - - for _, dbo := range res.Data { - b, ok := dbo.(*bookingmodel.Booking) - if !ok { - continue - } - if b.DestPeerID != selfPeerID { - logger.Info().Msgf("InfraTeardown: Admiralty teardown exec=%s (remote peer=%s)", - exec.ExecutionsID, b.DestPeerID) - NewAdmiraltySetter(exec.ExecutionsID).TeardownAsTarget(context.Background(), selfPeerID) - return // one teardown per execution is enough - } - } -} - -// teardownMinioForExecution tears down all Minio configuration for the execution: -// - storage bookings where this peer is the compute target → TeardownAsTarget -// - storage bookings where this peer is the Minio source → TeardownAsSource -func teardownMinioForExecution(ctx context.Context, executionsID string, localPeerID string) { - logger := oclib.GetLogger() - - res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). - Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ - "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, - "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, - }, - }, "", false) - - if res.Err != "" || len(res.Data) == 0 { - return - } - - for _, dbo := range res.Data { - b, ok := dbo.(*bookingmodel.Booking) - if !ok { - continue - } - if b.DestPeerID == localPeerID { - // This peer is the compute target: tear down K8s secret + configmap. - logger.Info().Msgf("InfraTeardown: Minio target teardown exec=%s storage=%s", executionsID, b.ResourceID) - event := minio.MinioDeleteEvent{ - ExecutionsID: executionsID, - MinioID: b.ResourceID, - SourcePeerID: b.DestPeerID, - DestPeerID: localPeerID, - OriginID: "", - } - minio.NewMinioSetter(executionsID, b.ResourceID).TeardownAsTarget(ctx, event) - } else { - // This peer is the Minio source: revoke SA + remove execution bucket. - logger.Info().Msgf("InfraTeardown: Minio source teardown exec=%s storage=%s", executionsID, b.ResourceID) - event := minio.MinioDeleteEvent{ - ExecutionsID: executionsID, - MinioID: b.ResourceID, - SourcePeerID: localPeerID, - DestPeerID: b.DestPeerID, - OriginID: "", - } - minio.NewMinioSetter(executionsID, b.ResourceID).TeardownAsSource(ctx, event) - } - } -} - -// teardownPVCForExecution deletes all local PVCs provisioned for the execution. -// It searches LIVE_STORAGE bookings and resolves the storage name via the live storage. -func teardownPVCForExecution(ctx context.Context, executionsID string, localPeerID string) { - logger := oclib.GetLogger() - - res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). - Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ - "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, - "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, - }, - }, "", false) - - if res.Err != "" || len(res.Data) == 0 { - return - } - - for _, dbo := range res.Data { - b, ok := dbo.(*bookingmodel.Booking) - if !ok { - continue - } - // Resolve storage name from live storage to compute the claim name. - storageName := storage.ResolveStorageName(b.ResourceID, localPeerID) - if storageName == "" { - continue - } - logger.Info().Msgf("InfraTeardown: PVC teardown exec=%s storage=%s", executionsID, b.ResourceID) - event := storage.PVCDeleteEvent{ - ExecutionsID: executionsID, - StorageID: b.ResourceID, - StorageName: storageName, - SourcePeerID: localPeerID, - DestPeerID: b.DestPeerID, - OriginID: "", - } - storage.NewPVCSetter(executionsID, b.ResourceID).TeardownAsSource(ctx, event) - } -} diff --git a/infrastructure/kubernetes/kubernetes.go b/infrastructure/kubernetes/kubernetes.go new file mode 100644 index 0000000..e39d6b5 --- /dev/null +++ b/infrastructure/kubernetes/kubernetes.go @@ -0,0 +1,323 @@ +package kubernetes + +import ( + "context" + "encoding/base64" + "fmt" + "strings" + "sync" + "time" + + "oc-datacenter/conf" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/allowed_image" + "cloud.o-forge.io/core/oc-lib/tools" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type KubernetesService struct { + ExecutionsID string +} + +func NewKubernetesService(executionsID string) *KubernetesService { + return &KubernetesService{ + ExecutionsID: executionsID, + } +} + +// prepullRegistry associe executionsID → images pre-pullées pour ce run. +// Utilisé par CleanupImages après WORKFLOW_DONE_EVENT. +var prepullRegistry sync.Map + +// RunPrepull crée un Job k8s dans le namespace executionsID qui pre-pull chaque +// image de la liste (imagePullPolicy: IfNotPresent). Bloque jusqu'à la complétion +// du Job ou timeout (5 min). Enregistre les images pour le cleanup post-exec. +func (s *KubernetesService) RunPrepull(ctx context.Context, images []string) error { + logger := oclib.GetLogger() + + // Toujours stocker pour le cleanup, même si le pull échoue. + prepullRegistry.Store(s.ExecutionsID, images) + + if len(images) == 0 { + return nil + } + + cs, err := s.newClientset() + if err != nil { + return fmt.Errorf("RunPrepull: failed to build clientset: %w", err) + } + + // Un container par image — ils tournent tous en parallèle dans le même pod. + containers := make([]corev1.Container, 0, len(images)) + for i, img := range images { + containers = append(containers, corev1.Container{ + Name: fmt.Sprintf("prepull-%d", i), + Image: img, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"true"}, + }) + } + + var backoff int32 = 0 + jobName := "prepull-" + s.ExecutionsID + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: s.ExecutionsID, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: &backoff, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: containers, + }, + }, + }, + } + + if _, err := cs.BatchV1().Jobs(s.ExecutionsID).Create(ctx, job, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("RunPrepull: failed to create job: %w", err) + } + + timeout := int64(300) // 5 min, cohérent avec waitForConsiders + watcher, err := cs.BatchV1().Jobs(s.ExecutionsID).Watch(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + jobName, + TimeoutSeconds: &timeout, + }) + if err != nil { + return fmt.Errorf("RunPrepull: failed to watch job: %w", err) + } + defer watcher.Stop() + + for event := range watcher.ResultChan() { + j, ok := event.Object.(*batchv1.Job) + if !ok { + continue + } + for _, cond := range j.Status.Conditions { + if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue { + logger.Info().Msgf("RunPrepull: job %s completed for ns %s", jobName, s.ExecutionsID) + return nil + } + if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue { + return fmt.Errorf("RunPrepull: job %s failed for ns %s", jobName, s.ExecutionsID) + } + } + } + return fmt.Errorf("RunPrepull: timeout waiting for job %s", jobName) +} + +// CleanupImages récupère les images pre-pullées pour ce run, filtre celles +// absentes de AllowedImages, et planifie leur suppression via un DaemonSet +// privilégié (crictl rmi) sur tous les nœuds du cluster. +// Appelé depuis teardownInfraForExecution au WORKFLOW_DONE_EVENT. +func (s *KubernetesService) CleanupImages(ctx context.Context) { + logger := oclib.GetLogger() + + raw, ok := prepullRegistry.LoadAndDelete(s.ExecutionsID) + if !ok { + return + } + images := raw.([]string) + if len(images) == 0 { + return + } + + toRemove := s.filterNonAllowed(images) + if len(toRemove) == 0 { + logger.Info().Msgf("CleanupImages: all images for %s are in AllowedImages, keeping", s.ExecutionsID) + return + } + + logger.Info().Msgf("CleanupImages: scheduling removal of %d image(s) for %s: %v", + len(toRemove), s.ExecutionsID, toRemove) + go s.scheduleImageRemoval(ctx, toRemove) +} + +// filterNonAllowed retourne les images non présentes dans AllowedImages. +func (s *KubernetesService) filterNonAllowed(images []string) []string { + var toRemove []string + for _, img := range images { + registry, name, tag := s.parseImage(img) + res := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.ALLOWED_IMAGE), nil).Search( + &dbs.Filters{ + And: map[string][]dbs.Filter{ + "image": {{Operator: dbs.EQUAL.String(), Value: name}}, + }, + }, "", false) + + if len(res.Data) == 0 { + toRemove = append(toRemove, img) + continue + } + + allowed := false + for _, d := range res.Data { + a, ok := d.(*allowed_image.AllowedImage) + if !ok { + continue + } + if a.Registry != "" && a.Registry != registry { + continue + } + if s.matchesTagConstraint(a.TagConstraint, tag) { + allowed = true + break + } + } + if !allowed { + toRemove = append(toRemove, img) + } + } + return toRemove +} + +// scheduleImageRemoval crée un DaemonSet privilégié sur tous les nœuds du cluster +// qui exécute "crictl rmi" pour chaque image à supprimer, puis supprime le DaemonSet. +func (s *KubernetesService) scheduleImageRemoval(ctx context.Context, images []string) { + logger := oclib.GetLogger() + + cs, err := s.newClientset() + if err != nil { + logger.Error().Msgf("scheduleImageRemoval: failed to build clientset: %v", err) + return + } + + // Commande shell : crictl rmi image1 image2 ... || true (best-effort) + args := strings.Join(images, " ") + cmd := fmt.Sprintf("crictl rmi %s || true", args) + + privileged := true + dsName := "oc-cleanup-" + s.ExecutionsID + + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: dsName, + Namespace: "default", + Labels: map[string]string{"app": dsName}, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": dsName}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": dsName}, + }, + Spec: corev1.PodSpec{ + // Tolère tous les taints pour atteindre tous les nœuds. + Tolerations: []corev1.Toleration{ + {Operator: corev1.TolerationOpExists}, + }, + HostPID: true, + Containers: []corev1.Container{{ + Name: "cleanup", + Image: "alpine:3", + // nsenter entre dans le namespace mount du host (PID 1) + // pour accéder au crictl installé sur le nœud. + Command: []string{"sh", "-c", + "nsenter -t 1 -m -u -i -n -- sh -c '" + cmd + "'"}, + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + }, + }}, + }, + }, + }, + } + + if _, err := cs.AppsV1().DaemonSets("default").Create(ctx, ds, metav1.CreateOptions{}); err != nil { + logger.Error().Msgf("scheduleImageRemoval: failed to create DaemonSet: %v", err) + return + } + + // Laisse le temps au DaemonSet de tourner sur tous les nœuds. + time.Sleep(30 * time.Second) + + if err := cs.AppsV1().DaemonSets("default").Delete(ctx, dsName, metav1.DeleteOptions{}); err != nil { + logger.Error().Msgf("scheduleImageRemoval: failed to delete DaemonSet: %v", err) + } + logger.Info().Msgf("scheduleImageRemoval: completed for %s", s.ExecutionsID) +} + +// parseImage décompose "registry/name:tag" en ses trois composants. +// registry vide si aucun composant ressemblant à un hostname n'est détecté. +func (s *KubernetesService) parseImage(image string) (registry, name, tag string) { + parts := strings.SplitN(image, ":", 2) + nameWithRegistry := parts[0] + if len(parts) == 2 { + tag = parts[1] + } else { + tag = "latest" + } + + slashIdx := strings.Index(nameWithRegistry, "/") + if slashIdx == -1 { + return "", nameWithRegistry, tag + } + prefix := nameWithRegistry[:slashIdx] + // Présence d'un "." ou ":" ou "localhost" → c'est un hostname de registry. + if strings.ContainsAny(prefix, ".:") || prefix == "localhost" { + return prefix, nameWithRegistry[slashIdx+1:], tag + } + return "", nameWithRegistry, tag +} + +// matchesTagConstraint vérifie si tag satisfait la contrainte. +// Vide = toutes versions. Supporte exact et glob suffixe ("3.*"). +func (s *KubernetesService) matchesTagConstraint(constraint, tag string) bool { + if constraint == "" { + return true + } + if strings.HasSuffix(constraint, "*") { + return strings.HasPrefix(tag, strings.TrimSuffix(constraint, "*")) + } + return constraint == tag +} + +// newClientset construit un client k8s depuis les credentials base64 en conf. +func (s *KubernetesService) newClientset() (*kubernetes.Clientset, error) { + caData, err := base64.StdEncoding.DecodeString(conf.GetConfig().KubeCA) + if err != nil { + return nil, fmt.Errorf("newClientset: invalid KubeCA: %w", err) + } + certData, err := base64.StdEncoding.DecodeString(conf.GetConfig().KubeCert) + if err != nil { + return nil, fmt.Errorf("newClientset: invalid KubeCert: %w", err) + } + keyData, err := base64.StdEncoding.DecodeString(conf.GetConfig().KubeData) + if err != nil { + return nil, fmt.Errorf("newClientset: invalid KubeData: %w", err) + } + cfg := &rest.Config{ + Host: "https://" + conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort, + TLSClientConfig: rest.TLSClientConfig{ + CAData: caData, + CertData: certData, + KeyData: keyData, + }, + } + return kubernetes.NewForConfig(cfg) +} + +func (s *KubernetesService) CreateNamespace() error { + logger := oclib.GetLogger() + serv, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, + conf.GetConfig().KubeCert, conf.GetConfig().KubeData) + if err != nil { + logger.Error().Msg("CreateNamespace: failed to init k8s service: " + err.Error()) + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return serv.ProvisionExecutionNamespace(ctx, s.ExecutionsID) +} diff --git a/models/kubeconfig.go b/infrastructure/kubernetes/models/kubeconfig.go similarity index 72% rename from models/kubeconfig.go rename to infrastructure/kubernetes/models/kubeconfig.go index 3a14a83..db60cbe 100644 --- a/models/kubeconfig.go +++ b/infrastructure/kubernetes/models/kubeconfig.go @@ -15,12 +15,11 @@ type KubeConfigValue struct { type KubeconfigUser struct { Name string `yaml:"name" json:"name"` User KubeconfigUserKeyPair `yaml:"user" json:"user"` - } // KubeconfigUserKeyPair is a struct used to create a kubectl configuration YAML file type KubeconfigUserKeyPair struct { - Token string `yaml:"token" json:"token"` + Token string `yaml:"token" json:"token"` } // KubeconfigAuthProvider is a struct used to create a kubectl authentication provider @@ -53,4 +52,21 @@ type KubeconfigContext struct { Cluster string `yaml:"cluster" json:"cluster"` Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"` User string `yaml:"user" json:"user"` -} \ No newline at end of file +} + +// kubeconfigEvent is the NATS payload used to transfer the kubeconfig from the source peer to the target peer. +type KubeconfigEvent struct { + DestPeerID string `json:"dest_peer_id"` + ExecutionsID string `json:"executions_id"` + Kubeconfig string `json:"kubeconfig"` + SourcePeerID string `json:"source_peer_id"` + // OriginID is the peer that initiated the provisioning request. + // The PB_CONSIDERS response is routed back to this peer. + OriginID string `json:"origin_id"` + // SourceExecutionsID is the execution namespace on the source cluster. + // Used by the target to provision PVCs with the correct claim name. + SourceExecutionsID string `json:"source_executions_id,omitempty"` + // Images is the list of container images to pre-pull on the compute peer + // before the workflow starts. + Images []string `json:"images,omitempty"` +} diff --git a/infrastructure/infra_watchdog.go b/infrastructure/kubernetes/watchdog.go similarity index 83% rename from infrastructure/infra_watchdog.go rename to infrastructure/kubernetes/watchdog.go index 1a60a4f..9bd7d79 100644 --- a/infrastructure/infra_watchdog.go +++ b/infrastructure/kubernetes/watchdog.go @@ -1,4 +1,4 @@ -package infrastructure +package kubernetes import ( "context" @@ -8,7 +8,8 @@ import ( "time" "oc-datacenter/conf" - "oc-datacenter/infrastructure/minio" + "oc-datacenter/infrastructure" + "oc-datacenter/infrastructure/admiralty" "oc-datacenter/infrastructure/storage" oclib "cloud.o-forge.io/core/oc-lib" @@ -29,22 +30,22 @@ var uuidNsPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0- // missed due to oc-monitord or oc-datacenter crash/restart). // // Must be launched in a goroutine from main. -func WatchInfra() { +func (s *KubernetesService) Watch() { logger := oclib.GetLogger() logger.Info().Msg("InfraWatchdog: started") ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { - if err := scanOrphanedInfra(); err != nil { + if err := s.scanOrphaned(); err != nil { logger.Error().Msg("InfraWatchdog: " + err.Error()) } - if err := scanOrphanedMinio(); err != nil { + if err := s.scanOrphanedMinio(); err != nil { logger.Error().Msg("InfraWatchdog(minio): " + err.Error()) } - if err := scanOrphanedAdmiraltyNodes(); err != nil { + if err := s.scanOrphanedAdmiraltyNodes(); err != nil { logger.Error().Msg("InfraWatchdog(admiralty-nodes): " + err.Error()) } - if err := scanOrphanedPVC(); err != nil { + if err := s.scanOrphanedPVC(); err != nil { logger.Error().Msg("InfraWatchdog(pvc): " + err.Error()) } } @@ -53,7 +54,7 @@ func WatchInfra() { // scanOrphanedInfra lists all UUID-named Kubernetes namespaces, looks up their // WorkflowExecution in the DB, and triggers teardown for any that are in a // terminal state. Namespaces already in Terminating phase are skipped. -func scanOrphanedInfra() error { +func (s *KubernetesService) scanOrphaned() error { logger := oclib.GetLogger() serv, err := tools.NewKubernetesService( @@ -97,7 +98,7 @@ func scanOrphanedInfra() error { logger.Info().Msgf("InfraWatchdog: orphaned infra detected for execution %s (state=%v) → teardown", executionsID, exec.State) - go teardownInfraForExecution(exec.GetID(), executionsID) + go s.TeardownForExecution(exec.GetID()) } return nil } @@ -106,7 +107,7 @@ func scanOrphanedInfra() error { // terminal state and triggers Minio teardown for each unique executionsID found. // This covers the case where the Kubernetes namespace is already gone (manual // deletion, prior partial teardown) but Minio SA and bucket were never revoked. -func scanOrphanedMinio() error { +func (s *KubernetesService) scanOrphanedMinio() error { logger := oclib.GetLogger() myself, err := oclib.GetMySelf() @@ -142,26 +143,26 @@ func scanOrphanedMinio() error { } seen[b.ExecutionsID] = true - + minio := storage.NewMinioSetter(b.ExecutionsID, b.ResourceID) // Determine this peer's role and call the appropriate teardown. if b.DestPeerID == peerID { logger.Info().Msgf("InfraWatchdog(minio): orphaned target resources for exec %s → TeardownAsTarget", b.ExecutionsID) - event := minio.MinioDeleteEvent{ + event := storage.MinioDeleteEvent{ ExecutionsID: b.ExecutionsID, MinioID: b.ResourceID, SourcePeerID: b.DestPeerID, DestPeerID: peerID, } - go minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsTarget(ctx, event) + go minio.TeardownAsTarget(ctx, event) } else { logger.Info().Msgf("InfraWatchdog(minio): orphaned source resources for exec %s → TeardownAsSource", b.ExecutionsID) - event := minio.MinioDeleteEvent{ + event := storage.MinioDeleteEvent{ ExecutionsID: b.ExecutionsID, MinioID: b.ResourceID, SourcePeerID: peerID, DestPeerID: b.DestPeerID, } - go minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event) + go minio.TeardownAsSource(ctx, event) } } return nil @@ -174,7 +175,7 @@ func scanOrphanedMinio() error { // This covers the gap where the namespace is already gone (or Terminating) but // the virtual node was never cleaned up by the Admiralty controller — which can // happen when the node goes NotReady before the AdmiraltyTarget CRD is deleted. -func scanOrphanedAdmiraltyNodes() error { +func (s *KubernetesService) scanOrphanedAdmiraltyNodes() error { logger := oclib.GetLogger() serv, err := tools.NewKubernetesService( @@ -250,7 +251,7 @@ func scanOrphanedAdmiraltyNodes() error { // // A LIVE_STORAGE booking is treated as a local PVC only when ResolveStorageName // returns a non-empty name — the same guard used by teardownPVCForExecution. -func scanOrphanedPVC() error { +func (s *KubernetesService) scanOrphanedPVC() error { logger := oclib.GetLogger() myself, err := oclib.GetMySelf() @@ -324,8 +325,35 @@ func findTerminalExecution(executionsID string, peerID string) *workflow_executi return nil } - if !closingStates[exec.State] { + if !infrastructure.ClosingStates[exec.State] { return nil } return exec } + +// teardownInfraForExecution handles infrastructure cleanup when a workflow terminates. +// oc-datacenter is responsible only for infra here — booking/execution state +// is managed by oc-scheduler. +func (s *KubernetesService) TeardownForExecution(executionID string) { + logger := oclib.GetLogger() + + myself, err := oclib.GetMySelf() + if err != nil || myself == nil { + return + } + selfPeerID := myself.GetID() + + adminReq := &tools.APIRequest{Admin: true} + res, _, loadErr := workflow_execution.NewAccessor(adminReq).LoadOne(executionID) + if loadErr != nil || res == nil { + logger.Warn().Msgf("teardownInfraForExecution: execution %s not found", executionID) + return + } + exec := res.(*workflow_execution.WorkflowExecution) + + ctx := context.Background() + admiralty.NewAdmiraltySetter(s.ExecutionsID).TeardownIfRemote(exec, selfPeerID) + storage.NewMinioSetter(s.ExecutionsID, "").TeardownForExecution(ctx, selfPeerID) + storage.NewPVCSetter(s.ExecutionsID, "").TeardownForExecution(ctx, selfPeerID) + s.CleanupImages(ctx) +} diff --git a/infrastructure/namespace.go b/infrastructure/namespace.go deleted file mode 100644 index afec5de..0000000 --- a/infrastructure/namespace.go +++ /dev/null @@ -1,28 +0,0 @@ -package infrastructure - -import ( - "context" - "oc-datacenter/conf" - "time" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/tools" -) - -// --------------------------------------------------------------------------- -// Kubernetes namespace helper -// --------------------------------------------------------------------------- - -func CreateNamespace(ns string) error { - logger := oclib.GetLogger() - serv, err := tools.NewKubernetesService( - conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, - conf.GetConfig().KubeCert, conf.GetConfig().KubeData) - if err != nil { - logger.Error().Msg("CreateNamespace: failed to init k8s service: " + err.Error()) - return err - } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - return serv.ProvisionExecutionNamespace(ctx, ns) -} diff --git a/infrastructure/nats.go b/infrastructure/nats/nats.go similarity index 78% rename from infrastructure/nats.go rename to infrastructure/nats/nats.go index 1f8ac24..3ecd2d7 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats/nats.go @@ -1,47 +1,22 @@ -package infrastructure +package nats import ( "context" "encoding/json" "fmt" - "oc-datacenter/infrastructure/minio" + "oc-datacenter/infrastructure/admiralty" + "oc-datacenter/infrastructure/kubernetes" + "oc-datacenter/infrastructure/kubernetes/models" "oc-datacenter/infrastructure/storage" "sync" oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) // roleWaiters maps executionID → channel expecting the role-assignment message from OC discovery. var roleWaiters sync.Map -// teardownInfraForExecution handles infrastructure cleanup when a workflow terminates. -// oc-datacenter is responsible only for infra here — booking/execution state -// is managed by oc-scheduler. -func teardownInfraForExecution(executionID string, executionsID string) { - logger := oclib.GetLogger() - - myself, err := oclib.GetMySelf() - if err != nil || myself == nil { - return - } - selfPeerID := myself.GetID() - - adminReq := &tools.APIRequest{Admin: true} - res, _, loadErr := workflow_execution.NewAccessor(adminReq).LoadOne(executionID) - if loadErr != nil || res == nil { - logger.Warn().Msgf("teardownInfraForExecution: execution %s not found", executionID) - return - } - exec := res.(*workflow_execution.WorkflowExecution) - - ctx := context.Background() - teardownAdmiraltyIfRemote(exec, selfPeerID) - teardownMinioForExecution(ctx, executionsID, selfPeerID) - teardownPVCForExecution(ctx, executionsID, selfPeerID) -} - // ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event. // // When MinioID is non-empty and Local is false, the event concerns Minio credential provisioning. @@ -59,6 +34,9 @@ type ArgoKubeEvent struct { // OriginID is the peer that initiated the request; the PB_CONSIDERS // response is routed back to this peer once provisioning completes. OriginID string `json:"origin_id,omitempty"` + // Images is the list of container images to pre-pull on the target peer + // before the workflow starts. Empty for STORAGE_RESOURCE events. + Images []string `json:"images,omitempty"` } // ListenNATS starts all NATS subscriptions for the infrastructure layer. @@ -73,6 +51,7 @@ func ListenNATS() { if err := json.Unmarshal(resp.Payload, argo); err != nil { return } + kube := kubernetes.NewKubernetesService(argo.ExecutionsID) if argo.Type == tools.STORAGE_RESOURCE { if argo.Local { @@ -89,7 +68,7 @@ func ListenNATS() { } if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG PVC MYSELF") - err := CreateNamespace(argo.ExecutionsID) + err := kube.CreateNamespace() fmt.Println("NS", err) go setter.InitializeAsSource(context.Background(), event, true) } else { @@ -113,16 +92,16 @@ func ListenNATS() { } else { fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT") // ── Minio credential provisioning ────────────────────────────── - setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID) + setter := storage.NewMinioSetter(argo.ExecutionsID, argo.MinioID) if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG MYSELF") - err := CreateNamespace(argo.ExecutionsID) + err := kube.CreateNamespace() fmt.Println("NS", err) go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID, true) } else { // Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "") // so oc-discovery routes the role-assignment to the Minio host. - phase1 := minio.MinioCredentialEvent{ + phase1 := storage.MinioCredentialEvent{ ExecutionsID: argo.ExecutionsID, MinioID: argo.MinioID, SourcePeerID: argo.SourcePeerID, @@ -148,14 +127,25 @@ func ListenNATS() { } } else { fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT") - // ── Admiralty kubeconfig provisioning (existing behaviour) ────── + // ── Pre-pull + Admiralty kubeconfig provisioning ───────────── fmt.Println(argo.SourcePeerID, argo.DestPeerID) if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG MYSELF") - err := CreateNamespace(argo.ExecutionsID) + kube := kubernetes.NewKubernetesService(argo.ExecutionsID) + err := kube.CreateNamespace() fmt.Println("NS", err) - go NewAdmiraltySetter(argo.ExecutionsID).InitializeAsSource( - context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID, true) + go func(a ArgoKubeEvent) { + ctx := context.Background() + // Pre-pull en premier : PB_CONSIDERS n'est envoyé qu'après. + if len(a.Images) > 0 { + if err := kube.RunPrepull(ctx, a.Images); err != nil { + logger := oclib.GetLogger() + logger.Error().Msgf("RunPrepull local: %v", err) + } + } + admiralty.NewAdmiraltySetter(a.ExecutionsID).InitializeAsSource( + ctx, a.SourcePeerID, a.DestPeerID, a.OriginID, true, a.Images) + }(*argo) } else if b, err := json.Marshal(argo); err == nil { if b2, err := json.Marshal(&tools.PropalgationMessage{ Payload: b, @@ -179,20 +169,28 @@ func ListenNATS() { // Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence). tools.ADMIRALTY_CONFIG_EVENT: func(resp tools.NATSResponse) { - kubeconfigEvent := KubeconfigEvent{} + kubeconfigEvent := models.KubeconfigEvent{} if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil { if kubeconfigEvent.Kubeconfig != "" { // Phase 2: kubeconfig present → this peer is the TARGET (scheduler). fmt.Println("CreateAdmiraltyTarget") - NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget( + admiralty.NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget( context.Background(), kubeconfigEvent, false) } else { - err := CreateNamespace(kubeconfigEvent.ExecutionsID) + kube := kubernetes.NewKubernetesService(kubeconfigEvent.ExecutionsID) + err := kube.CreateNamespace() fmt.Println("NS", err) // Phase 1: no kubeconfig → this peer is the SOURCE (compute). + if len(kubeconfigEvent.Images) > 0 { + if err := kube.RunPrepull(context.Background(), kubeconfigEvent.Images); err != nil { + logger := oclib.GetLogger() + logger.Error().Msgf("RunPrepull local: %v", err) + } + } fmt.Println("CreateAdmiraltySource") - NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource( - context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID, false) + admiralty.NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource( + context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, + kubeconfigEvent.OriginID, false, kubeconfigEvent.Images) } } }, @@ -201,17 +199,17 @@ func ListenNATS() { // Forwarded by oc-discovery after receiving via libp2p ProtocolMinioConfigResource. // Payload is a MinioCredentialEvent (phase discriminated by Access presence). tools.MINIO_CONFIG_EVENT: func(resp tools.NATSResponse) { - minioEvent := minio.MinioCredentialEvent{} + minioEvent := storage.MinioCredentialEvent{} if err := json.Unmarshal(resp.Payload, &minioEvent); err == nil { if minioEvent.Access != "" { // Phase 2: credentials present → this peer is the TARGET (compute). - minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget( + storage.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget( context.Background(), minioEvent, false) } else { - err := CreateNamespace(minioEvent.ExecutionsID) + err := kubernetes.NewKubernetesService(minioEvent.ExecutionsID).CreateNamespace() fmt.Println("NS", err) // Phase 1: no credentials → this peer is the SOURCE (Minio host). - minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource( + storage.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource( context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID, false) } } @@ -223,7 +221,7 @@ func ListenNATS() { tools.PVC_CONFIG_EVENT: func(resp tools.NATSResponse) { event := storage.PVCProvisionEvent{} if err := json.Unmarshal(resp.Payload, &event); err == nil { - err := CreateNamespace(event.ExecutionsID) + err := kubernetes.NewKubernetesService(event.ExecutionsID).CreateNamespace() fmt.Println("NS", err) storage.NewPVCSetter(event.ExecutionsID, event.StorageID).InitializeAsSource( context.Background(), event, false) @@ -239,7 +237,7 @@ func ListenNATS() { if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.ExecutionsID == "" { return } - go teardownInfraForExecution(evt.ExecutionID, evt.ExecutionsID) + go kubernetes.NewKubernetesService(evt.ExecutionsID).TeardownForExecution(evt.ExecutionID) }, // ─── REMOVE_RESOURCE ──────────────────────────────────────────────────────── @@ -254,16 +252,16 @@ func ListenNATS() { go storage.NewPVCSetter(pvcEvent.ExecutionsID, pvcEvent.StorageID). TeardownAsSource(context.Background(), pvcEvent) } else { - deleteEvent := minio.MinioDeleteEvent{} + deleteEvent := storage.MinioDeleteEvent{} if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" { - go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). + go storage.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). TeardownAsSource(context.Background(), deleteEvent) } } case tools.COMPUTE_RESOURCE: argo := &ArgoKubeEvent{} if err := json.Unmarshal(resp.Payload, argo); err == nil && argo.ExecutionsID != "" { - go NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background()) + go admiralty.NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background()) } } }, diff --git a/infrastructure/minio/minio.go b/infrastructure/storage/minio.go similarity index 99% rename from infrastructure/minio/minio.go rename to infrastructure/storage/minio.go index f8fbec5..e2ebc21 100644 --- a/infrastructure/minio/minio.go +++ b/infrastructure/storage/minio.go @@ -1,4 +1,4 @@ -package minio +package storage import ( "context" diff --git a/infrastructure/minio/minio_setter.go b/infrastructure/storage/minio_setter.go similarity index 83% rename from infrastructure/minio/minio_setter.go rename to infrastructure/storage/minio_setter.go index af1cf2f..a5381c7 100644 --- a/infrastructure/minio/minio_setter.go +++ b/infrastructure/storage/minio_setter.go @@ -1,4 +1,4 @@ -package minio +package storage import ( "context" @@ -9,6 +9,8 @@ import ( "oc-datacenter/conf" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/tools" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,11 +44,21 @@ type minioConsidersPayload struct { Error *string `json:"error,omitempty"` } +// MinioSetter carries the execution context for a Minio credential provisioning. +type MinioSetter struct { + ExecutionsID string // used as both the bucket name and the K8s namespace suffix + MinioID string // ID of the Minio storage resource +} + +func NewMinioSetter(execID, minioID string) *MinioSetter { + return &MinioSetter{ExecutionsID: execID, MinioID: minioID} +} + // emitConsiders publishes a PB_CONSIDERS back to OriginID with the result of // the minio provisioning. secret is the provisioned credential; err is nil on success. // When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT // instead of routing through PROPALGATION_EVENT. -func emitConsiders(executionsID, originID, secret string, provErr error, self bool) { +func (m *MinioSetter) emitConsiders(executionsID, originID, secret string, provErr error, self bool) { fmt.Println("emitConsiders !") var errStr *string if provErr != nil { @@ -81,16 +93,6 @@ func emitConsiders(executionsID, originID, secret string, provErr error, self bo }) } -// MinioSetter carries the execution context for a Minio credential provisioning. -type MinioSetter struct { - ExecutionsID string // used as both the bucket name and the K8s namespace suffix - MinioID string // ID of the Minio storage resource -} - -func NewMinioSetter(execID, minioID string) *MinioSetter { - return &MinioSetter{ExecutionsID: execID, MinioID: minioID} -} - // InitializeAsSource is called on the peer that hosts the Minio instance. // // It: @@ -187,18 +189,18 @@ func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredent if err := k.CreateSecret(ctx, event.MinioID, event.ExecutionsID, event.Access, event.Secret); err != nil { logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create k8s secret: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, "", err, self) + m.emitConsiders(event.ExecutionsID, event.OriginID, "", err, self) return } if err := NewMinioService(event.URL).CreateMinioConfigMap(event.MinioID, event.ExecutionsID, event.URL); err != nil { logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create config map: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, "", err, self) + m.emitConsiders(event.ExecutionsID, event.OriginID, "", err, self) return } logger.Info().Msg("MinioSetter.InitializeAsTarget: Minio credentials stored in namespace " + event.ExecutionsID) - emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil, self) + m.emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil, self) } // MinioDeleteEvent is the NATS payload used to tear down Minio resources. @@ -227,7 +229,7 @@ func (m *MinioSetter) TeardownAsTarget(ctx context.Context, event MinioDeleteEve ) if err != nil { logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to create k8s service: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, "", err, event.SourcePeerID == event.DestPeerID) + m.emitConsiders(event.ExecutionsID, event.OriginID, "", err, event.SourcePeerID == event.DestPeerID) return } @@ -309,3 +311,52 @@ func (m *MinioSetter) loadMinioURL(peerID string) (string, error) { } return "", fmt.Errorf("loadMinioURL: no live storage found for minio ID %s", m.MinioID) } + +// teardownMinioForExecution tears down all Minio configuration for the execution: +// - storage bookings where this peer is the compute target → TeardownAsTarget +// - storage bookings where this peer is the Minio source → TeardownAsSource +func (m *MinioSetter) TeardownForExecution(ctx context.Context, localPeerID string) { + logger := oclib.GetLogger() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "executions_id": {{Operator: dbs.EQUAL.String(), Value: m.ExecutionsID}}, + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" || len(res.Data) == 0 { + return + } + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok { + continue + } + if b.DestPeerID == localPeerID { + // This peer is the compute target: tear down K8s secret + configmap. + logger.Info().Msgf("InfraTeardown: Minio target teardown exec=%s storage=%s", m.ExecutionsID, b.ResourceID) + event := MinioDeleteEvent{ + ExecutionsID: m.ExecutionsID, + MinioID: b.ResourceID, + SourcePeerID: b.DestPeerID, + DestPeerID: localPeerID, + OriginID: "", + } + m.TeardownAsTarget(ctx, event) + } else { + // This peer is the Minio source: revoke SA + remove execution bucket. + logger.Info().Msgf("InfraTeardown: Minio source teardown exec=%s storage=%s", m.ExecutionsID, b.ResourceID) + event := MinioDeleteEvent{ + ExecutionsID: m.ExecutionsID, + MinioID: b.ResourceID, + SourcePeerID: localPeerID, + DestPeerID: b.DestPeerID, + OriginID: "", + } + m.TeardownAsSource(ctx, event) + } + } +} diff --git a/infrastructure/storage/pvc_setter.go b/infrastructure/storage/pvc_setter.go index 9ac7e7e..521ef7c 100644 --- a/infrastructure/storage/pvc_setter.go +++ b/infrastructure/storage/pvc_setter.go @@ -10,6 +10,8 @@ import ( "oc-datacenter/conf" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -44,13 +46,22 @@ func ClaimName(storageName, executionsID string) string { type PVCSetter struct { ExecutionsID string StorageID string + // ClaimSuffix overrides ExecutionsID as the suffix in ClaimName when non-empty. + // Used when the PVC namespace differs from the claim name suffix (Admiralty target). + ClaimSuffix string } func NewPVCSetter(execID, storageID string) *PVCSetter { return &PVCSetter{ExecutionsID: execID, StorageID: storageID} } -func emitConsiders(executionsID, originID string, provErr error, self bool) { +// NewPVCSetterWithClaimSuffix creates a PVCSetter where the claim name suffix +// differs from the execution namespace (e.g. Admiralty target provisioning). +func NewPVCSetterWithClaimSuffix(storageID, claimSuffix string) *PVCSetter { + return &PVCSetter{StorageID: storageID, ClaimSuffix: claimSuffix} +} + +func (p *PVCSetter) emitConsiders(executionsID, originID string, provErr error, self bool) { type pvcConsidersPayload struct { OriginID string `json:"origin_id"` ExecutionsID string `json:"executions_id"` @@ -96,7 +107,7 @@ func (p *PVCSetter) InitializeAsSource(ctx context.Context, event PVCProvisionEv sizeStr, err := p.loadStorageSize(event.SourcePeerID) if err != nil { logger.Error().Msg("PVCSetter.InitializeAsSource: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, err, self) + p.emitConsiders(event.ExecutionsID, event.OriginID, err, self) return } @@ -106,19 +117,23 @@ func (p *PVCSetter) InitializeAsSource(ctx context.Context, event PVCProvisionEv ) if err != nil { logger.Error().Msg("PVCSetter.InitializeAsSource: failed to create k8s service: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, err, self) + p.emitConsiders(event.ExecutionsID, event.OriginID, err, self) return } - claimName := ClaimName(event.StorageName, event.ExecutionsID) + claimSuffix := event.ExecutionsID + if p.ClaimSuffix != "" { + claimSuffix = p.ClaimSuffix + } + claimName := ClaimName(event.StorageName, claimSuffix) if err := k.CreatePVC(ctx, claimName, event.ExecutionsID, sizeStr); err != nil { logger.Error().Msg("PVCSetter.InitializeAsSource: failed to create PVC: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, err, self) + p.emitConsiders(event.ExecutionsID, event.OriginID, err, self) return } logger.Info().Msg("PVCSetter.InitializeAsSource: PVC " + claimName + " created in " + event.ExecutionsID) - emitConsiders(event.ExecutionsID, event.OriginID, nil, self) + p.emitConsiders(event.ExecutionsID, event.OriginID, nil, self) } // TeardownAsSource deletes the PVC from the execution namespace. @@ -172,3 +187,44 @@ func (p *PVCSetter) loadStorageSize(peerID string) (string, error) { } return "10Gi", nil } + +// teardownPVCForExecution deletes all local PVCs provisioned for the execution. +// It searches LIVE_STORAGE bookings and resolves the storage name via the live storage. +func (p *PVCSetter) TeardownForExecution(ctx context.Context, localPeerID string) { + logger := oclib.GetLogger() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "executions_id": {{Operator: dbs.EQUAL.String(), Value: p.ExecutionsID}}, + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" || len(res.Data) == 0 { + return + } + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok { + continue + } + // Resolve storage name from live storage to compute the claim name. + storageName := ResolveStorageName(b.ResourceID, localPeerID) + if storageName == "" { + continue + } + logger.Info().Msgf("InfraTeardown: PVC teardown exec=%s storage=%s", p.ExecutionsID, b.ResourceID) + event := PVCDeleteEvent{ + ExecutionsID: p.ExecutionsID, + StorageID: b.ResourceID, + StorageName: storageName, + SourcePeerID: localPeerID, + DestPeerID: b.DestPeerID, + OriginID: "", + } + p.StorageID = b.ResourceID + p.TeardownAsSource(ctx, event) + } +} diff --git a/main.go b/main.go index ed46659..fed919d 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,8 @@ func main() { conf.GetConfig().MinioRootSecret = o.GetStringDefault("MINIO_ADMIN_SECRET", "") oclib.InitAPI(appname) + infrastructure.BootstrapAllowedImages() + go infrastructure.ListenNATS() go infrastructure.WatchBookings() go infrastructure.WatchInfra() diff --git a/routers/router.go b/routers/router.go index 79a291b..9119a39 100644 --- a/routers/router.go +++ b/routers/router.go @@ -29,6 +29,11 @@ func init() { &controllers.VersionController{}, ), ), + beego.NSNamespace("/allowed-image", + beego.NSInclude( + &controllers.AllowedImageController{}, + ), + ), ) beego.AddNamespace(ns)