diff --git a/conf/config.go b/conf/config.go index f2b1879..d16f9d2 100644 --- a/conf/config.go +++ b/conf/config.go @@ -6,13 +6,17 @@ type Config struct { Mode string KubeHost string KubePort string - KubeCA string - KubeCert string - KubeData string - MinioRootKey string - MinioRootSecret string - MonitorMode string - MonitorAddress string + // KubeExternalHost is the externally reachable address of this cluster's API server. + // Used when generating kubeconfigs for remote peers. Must be an IP or hostname + // reachable from outside the cluster (NOT kubernetes.default.svc.cluster.local). + KubeExternalHost string + KubeCA string + KubeCert string + KubeData string + MinioRootKey string + MinioRootSecret string + MonitorMode string + MonitorAddress string } var instance *Config diff --git a/controllers/error_handler.go b/controllers/error_handler.go deleted file mode 100644 index 7700807..0000000 --- a/controllers/error_handler.go +++ /dev/null @@ -1,21 +0,0 @@ -package controllers - -import ( - "fmt" - - beego "github.com/beego/beego/v2/server/web" -) - -func HandleControllerErrors(c beego.Controller, code int, err *error, data *map[string]interface{}, messages ...string) { - for _, mess := range messages { - fmt.Println(mess) - } - if data != nil { - c.Data["json"] = data - } - if err != nil { - c.Data["json"] = map[string]string{"error": (*err).Error()} - } - c.Ctx.Output.SetStatus(code) - c.ServeJSON() -} \ No newline at end of file diff --git a/controllers/session.go b/controllers/session.go index 1a7d052..5ae88f8 100644 --- a/controllers/session.go +++ b/controllers/session.go @@ -1,7 +1,6 @@ package controllers import ( - "fmt" "oc-datacenter/conf" "strconv" @@ -41,7 +40,6 @@ func (o *SessionController) GetToken() { o.ServeJSON() return } - fmt.Println("BLAPO", id, duration) token, err := serv.GenerateToken(o.Ctx.Request.Context(), id, duration) if err != nil { // change code to 500 diff --git a/docker_datacenter.json b/docker_datacenter.json index 1f9013e..1b5ea0e 100644 --- a/docker_datacenter.json +++ b/docker_datacenter.json @@ -1,5 +1,11 @@ { "MONGO_URL":"mongodb://mongo:27017/", "NATS_URL":"nats://nats:4222", - "MONGO_DATABASE":"DC_myDC" -} \ No newline at end of file + "MONGO_DATABASE":"DC_myDC", + "KUBERNETES_SERVICE_HOST": "kubernetes.default.svc.cluster.local", + "KUBERNETES_SERVICE_PORT": "6443", + "KUBE_EXTERNAL_HOST": "", + "KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpReU56STVNVEF3SGhjTk1qWXdNekl6TVRNek5URXdXaGNOTXpZd016SXdNVE16TlRFdwpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpReU56STVNVEF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFSSGpYRDVpbnRIYWZWSk5VaDFlRnIxcXBKdFlkUmc5NStKVENEa0tadTIKYjUxRXlKaG1zanRIY3BDUndGL1VGMzlvdzY4TFBUcjBxaUorUHlhQTBLZUtvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTdWQkNzZVN3ajJ2cmczMFE5UG8vCnV6ZzAvMjR3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUlEOVY2aFlUSS83ZW1hRzU0dDdDWVU3TXFSdDdESUkKNlgvSUwrQ0RLbzlNQWlCdlFEMGJmT0tVWDc4UmRGdUplcEhEdWFUMUExaGkxcWdIUGduM1dZdDBxUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", + "KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJUU5KbFNJQUJPMDR3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOemMwTWpjeU9URXdNQjRYRFRJMk1ETXlNekV6TXpVeE1Gb1hEVEkzTURNeQpNekV6TXpVeE1Gb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJMY3Uwb2pUbVg4RFhTQkYKSHZwZDZNVEoyTHdXc1lRTmdZVURXRDhTVERIUWlCczlMZ0x5ZTdOMEFvZk85RkNZVW1HamhiaVd3WFVHR3dGTgpUdlRMU2lXalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUlJhRW9wQzc5NGJyTHlnR0g5SVhvbDZTSmlFREFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQWhaRUlrSWV3Y1loL1NmTFVCVjE5MW1CYTNRK0J5S2J5eTVlQmpwL3kzeWtDSUIxWTJicTVOZTNLUUU4RAprNnNzeFJrbjJmN0VoWWVRQU1pUlJ2MjIweDNLCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTnpReU56STVNVEF3SGhjTk1qWXdNekl6TVRNek5URXdXaGNOTXpZd016SXdNVE16TlRFdwpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTnpReU56STVNVEF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTcTdVTC85MEc1ZmVTaE95NjI3eGFZWlM5dHhFdWFoWFQ3Vk5wZkpQSnMKaEdXd2UxOXdtbXZzdlp6dlNPUWFRSzJaMmttN0hSb1IrNlA1YjIyamczbHVvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVVXaEtLUXUvZUc2eThvQmgvU0Y2Ckpla2lZaEF3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUk3cGxHczFtV20ySDErbjRobDBNTk13RmZzd0o5ZXIKTzRGVkM0QzhwRG44QWlCN3NZMVFwd2M5VkRUeGNZaGxuZzZNUzRXai85K0lHWjJxcy94UStrMjdTQT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", + "KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUROZDRnWXd6aVRhK1hwNnFtNVc3SHFzc1JJNkREaUJTbUV2ZHoxZzk3VGxvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFdHk3U2lOT1pmd05kSUVVZStsM294TW5ZdkJheGhBMkJoUU5ZUHhKTU1kQ0lHejB1QXZKNwpzM1FDaDg3MFVKaFNZYU9GdUpiQmRRWWJBVTFPOU10S0pRPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" +} diff --git a/go.mod b/go.mod index ad9eb3c..f4306be 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-datacenter go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe + cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46 github.com/beego/beego/v2 v2.3.8 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/minio/madmin-go/v4 v4.1.1 diff --git a/go.sum b/go.sum index 4dc6f7c..2785808 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,15 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe h1:CHiWQAX7j/bMfbytCWGL2mUgSWYoDY4+bFQbCHEfypk= cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323080307-5bdd2554a769 h1:TYluuZ28s58KqXrh3Z4nTYje3TVcLJN3VJwVwF9uP0M= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323080307-5bdd2554a769/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323105321-14b449f5473b h1:ouGEzCLGLjUOQ0ciowv9yJv3RhylvUg1GTUlOqXHCSc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323105321-14b449f5473b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323111629-fa9893e1508c h1:4T+SJgpeK9+lpVQq68chTiAKdaevwvKYo/veP/cOFRY= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323111629-fa9893e1508c/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323112935-b76b22a8fbee h1:XQ85OdhYry8zolODV0ezS6+Ari36SpXcnRSbP4E6v2k= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323112935-b76b22a8fbee/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46 h1:71WVrnLj0SM6PfQxCh25b2JGcL/1MZ2lYt254R/8n28= +cloud.o-forge.io/core/oc-lib v0.0.0-20260323152020-211339947c46/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/infrastructure/admiralty.go b/infrastructure/admiralty.go index f72e921..91a3466 100644 --- a/infrastructure/admiralty.go +++ b/infrastructure/admiralty.go @@ -4,7 +4,9 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" + "strings" "sync" "time" @@ -14,7 +16,6 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/tools" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -43,7 +44,9 @@ type admiraltyConsidersPayload struct { // emitAdmiraltyConsiders publishes a PB_CONSIDERS back to OriginID with the result // of the admiralty provisioning. secret is the base64-encoded kubeconfig; err is nil on success. -func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error) { +// When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT +// instead of routing through PROPALGATION_EVENT. +func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error, self bool) { var errStr *string if provErr != nil { s := provErr.Error() @@ -55,6 +58,15 @@ func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error Secret: secret, Error: errStr, }) + if self { + go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: tools.COMPUTE_RESOURCE, + Method: int(tools.CONSIDERS_EVENT), + Payload: payload, + }) + return + } b, _ := json.Marshal(&tools.PropalgationMessage{ DataType: tools.COMPUTE_RESOURCE.EnumIndex(), Action: tools.PB_CONSIDERS, @@ -83,41 +95,36 @@ func NewAdmiraltySetter(execIDS string) *AdmiraltySetter { // InitializeAsSource is called on the peer that acts as the SOURCE cluster (compute provider). // It creates the AdmiraltySource resource, generates a kubeconfig for the target peer, // and publishes it on NATS so the target peer can complete its side of the setup. -func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string) { +func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string, self bool) error { logger := oclib.GetLogger() serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { - logger.Error().Msg("InitializeAsSource: failed to create service: " + err.Error()) - return + return errors.New("InitializeAsSource: failed to create service: " + err.Error()) } // Create the AdmiraltySource resource on this cluster (inlined from CreateAdmiraltySource controller) logger.Info().Msg("Creating AdmiraltySource ns-" + s.ExecutionsID) _, err = serv.CreateAdmiraltySource(ctx, s.ExecutionsID) - if err != nil && !apierrors.IsAlreadyExists(err) { - logger.Error().Msg("InitializeAsSource: failed to create source: " + err.Error()) - return + if err != nil && !strings.Contains(err.Error(), "already exists") { + return errors.New("InitializeAsSource: failed to create service: " + err.Error()) } // Generate a service-account token for the namespace (inlined from GetAdmiraltyKubeconfig controller) token, err := serv.GenerateToken(ctx, s.ExecutionsID, 3600) if err != nil { - logger.Error().Msg("InitializeAsSource: failed to generate token for ns-" + s.ExecutionsID + ": " + err.Error()) - return + return errors.New("InitializeAsSource: failed to generate token for ns-" + s.ExecutionsID + ": " + err.Error()) } kubeconfig, err := buildHostKubeWithToken(token) if err != nil { - logger.Error().Msg("InitializeAsSource: " + err.Error()) - return + return errors.New("InitializeAsSource: " + err.Error()) } b, err := json.Marshal(kubeconfig) if err != nil { - logger.Error().Msg("InitializeAsSource: failed to marshal kubeconfig: " + err.Error()) - return + return errors.New("InitializeAsSource: failed to marshal kubeconfig: " + err.Error()) } encodedKubeconfig := base64.StdEncoding.EncodeToString(b) kube := KubeconfigEvent{ @@ -128,14 +135,14 @@ func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID st OriginID: originID, } if destPeerID == localPeerID { - s.InitializeAsTarget(ctx, kube) - return + // Self case: source and target are the same cluster, no Admiralty target to configure. + emitAdmiraltyConsiders(s.ExecutionsID, originID, encodedKubeconfig, nil, true) + return nil } // Publish the kubeconfig on NATS so the target peer can proceed payload, err := json.Marshal(kube) if err != nil { - logger.Error().Msg("InitializeAsSource: failed to marshal kubeconfig event: " + err.Error()) - return + return errors.New("InitializeAsSource: failed to marshal kubeconfig event: " + err.Error()) } if b, err := json.Marshal(&tools.PropalgationMessage{ @@ -145,20 +152,22 @@ func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID st }); err == nil { go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", - Datatype: -1, + Datatype: tools.COMPUTE_RESOURCE, User: "", Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } + logger.Info().Msg("InitializeAsSource: kubeconfig published for ns-" + s.ExecutionsID) + return nil } // InitializeAsTarget is called on the peer that acts as the TARGET cluster (scheduler). // It waits for the kubeconfig published by the source peer via NATS, then creates // the Secret, AdmiraltyTarget, and polls until the virtual node appears. -// kubeconfigCh must be obtained from RegisterKubeconfigWaiter before this goroutine starts. -func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj KubeconfigEvent) { +// self must be true when the origin peer is the local peer (direct CONSIDERS_EVENT emission). +func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj KubeconfigEvent, self bool) { logger := oclib.GetLogger() defer kubeconfigChannels.Delete(s.ExecutionsID) @@ -174,17 +183,17 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj // 1. Create the namespace logger.Info().Msg("InitializeAsTarget: creating Namespace " + s.ExecutionsID) - if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !apierrors.IsAlreadyExists(err) { + if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create namespace: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) return } // 2. Create the ServiceAccount sa-{executionID} logger.Info().Msg("InitializeAsTarget: creating ServiceAccount sa-" + s.ExecutionsID) - if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !apierrors.IsAlreadyExists(err) { + if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create service account: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) return } @@ -204,18 +213,18 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj {"get", "create", "update"}, {"get"}, {"patch"}}, - ); err != nil && !apierrors.IsAlreadyExists(err) { + ); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create role: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) return } // 4. Create the RoleBinding rbName := "rb-" + s.ExecutionsID logger.Info().Msg("InitializeAsTarget: creating RoleBinding " + rbName) - if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !apierrors.IsAlreadyExists(err) { + if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create role binding: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) return } @@ -223,7 +232,7 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj logger.Info().Msg("InitializeAsTarget: creating Secret ns-" + s.ExecutionsID) if _, err := serv.CreateKubeconfigSecret(ctx, kubeconfigData, s.ExecutionsID, kubeconfigObj.SourcePeerID); err != nil { logger.Error().Msg("InitializeAsTarget: failed to create kubeconfig secret: " + err.Error()) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) return } @@ -235,14 +244,14 @@ func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj if err == nil { err = fmt.Errorf("CreateAdmiraltyTarget returned nil response") } - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err, self) return } // Poll until the virtual node appears (inlined from GetNodeReady controller) logger.Info().Msg("InitializeAsTarget: waiting for virtual node ns-" + s.ExecutionsID) s.waitForNode(ctx, serv, kubeconfigObj.SourcePeerID) - emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigData, nil) + emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigData, nil, self) } // waitForNode polls GetOneNode until the Admiralty virtual node appears on this cluster. @@ -325,7 +334,11 @@ func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) { if len(token) == 0 { return nil, fmt.Errorf("buildHostKubeWithToken: empty token") } - encodedCA := base64.StdEncoding.EncodeToString([]byte(conf.GetConfig().KubeCA)) + apiHost := conf.GetConfig().KubeExternalHost + if apiHost == "" { + apiHost = conf.GetConfig().KubeHost + } + encodedCA := conf.GetConfig().KubeCA return &models.KubeConfigValue{ APIVersion: "v1", CurrentContext: "default", @@ -334,7 +347,7 @@ func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) { Clusters: []models.KubeconfigNamedCluster{{ Name: "default", Cluster: models.KubeconfigCluster{ - Server: "https://" + conf.GetConfig().KubeHost + ":6443", + Server: "https://" + apiHost + ":6443", CertificateAuthorityData: encodedCA, }, }}, diff --git a/infrastructure/booking_watchdog.go b/infrastructure/booking_watchdog.go index 1ce025a..92ab1cc 100644 --- a/infrastructure/booking_watchdog.go +++ b/infrastructure/booking_watchdog.go @@ -2,11 +2,13 @@ package infrastructure import ( "context" + "encoding/json" "fmt" "sync" "time" "oc-datacenter/infrastructure/minio" + "oc-datacenter/infrastructure/storage" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" @@ -17,15 +19,10 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) -// processedBookings tracks booking IDs whose start-expiry has already been handled. -// Resets on restart; teardown methods are idempotent so duplicate runs are safe. +// processedBookings tracks booking IDs already handled this process lifetime. var processedBookings sync.Map -// processedEndBookings tracks booking IDs whose end-expiry (Admiralty source cleanup) -// has already been triggered in this process lifetime. -var processedEndBookings sync.Map - -// closingStates is the set of terminal booking states after which infra must be torn down. +// closingStates is the set of terminal booking states. var closingStates = map[enum.BookingStatus]bool{ enum.FAILURE: true, enum.SUCCESS: true, @@ -33,9 +30,12 @@ var closingStates = map[enum.BookingStatus]bool{ enum.CANCELLED: true, } -// WatchBookings starts a passive loop that ticks every minute, scans bookings whose -// ExpectedStartDate + 1 min has passed, transitions them to terminal states when needed, -// and tears down the associated Kubernetes / Minio infrastructure. +// WatchBookings is a safety-net fallback for when oc-monitord fails to launch. +// It detects bookings that are past expected_start_date by at least 1 minute and +// are still in a non-terminal state. Instead of writing to the database directly, +// it emits WORKFLOW_STEP_DONE_EVENT with State=FAILURE on NATS so that oc-scheduler +// handles the state transition — keeping a single source of truth for booking state. +// // Must be launched in a goroutine from main. func WatchBookings() { logger := oclib.GetLogger() @@ -43,18 +43,16 @@ func WatchBookings() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for range ticker.C { - if err := scanExpiredBookings(); err != nil { - logger.Error().Msg("BookingWatchdog: " + err.Error()) - } - if err := scanEndedExec(); err != nil { + if err := scanStaleBookings(); err != nil { logger.Error().Msg("BookingWatchdog: " + err.Error()) } } } -// scanExpiredBookings queries all bookings whose start deadline has passed and -// dispatches each one to processExpiredBooking. -func scanExpiredBookings() error { +// scanStaleBookings queries all bookings whose ExpectedStartDate passed more than +// 1 minute ago. Non-terminal ones get a WORKFLOW_STEP_DONE_EVENT FAILURE emitted +// on NATS so oc-scheduler closes them. +func scanStaleBookings() error { myself, err := oclib.GetMySelf() if err != nil { return fmt.Errorf("could not resolve local peer: %w", err) @@ -73,7 +71,7 @@ func scanExpiredBookings() error { }, "", false) if res.Err != "" { - return fmt.Errorf("booking search failed: %s", res.Err) + return fmt.Errorf("stale booking search failed: %s", res.Err) } for _, dbo := range res.Data { @@ -81,164 +79,162 @@ func scanExpiredBookings() error { if !ok { continue } - go processExpiredBooking(b, peerID) + go emitWatchdogFailure(b) } return nil } -// processExpiredBooking transitions the booking to a terminal state when applicable, -// then tears down infrastructure based on the resource type: -// - LIVE_DATACENTER / COMPUTE_RESOURCE → Admiralty (as target) + Minio (as target) -// - LIVE_STORAGE / STORAGE_RESOURCE → Minio (as source) -func processExpiredBooking(b *bookingmodel.Booking, peerID string) { +// emitWatchdogFailure publishes a WORKFLOW_STEP_DONE_EVENT FAILURE for a stale +// booking. oc-scheduler is the single authority for booking state transitions. +func emitWatchdogFailure(b *bookingmodel.Booking) { logger := oclib.GetLogger() - ctx := context.Background() - // Skip bookings already handled during this process lifetime. if _, done := processedBookings.Load(b.GetID()); done { return } - - // Transition non-terminal bookings. - if !closingStates[b.State] { - var newState enum.BookingStatus - switch b.State { - case enum.DRAFT, enum.DELAYED: - // DRAFT: never launched; DELAYED: was SCHEDULED but start never arrived. - newState = enum.FORGOTTEN - case enum.SCHEDULED: - // Passed its start date without ever being launched. - newState = enum.FAILURE - case enum.STARTED: - // A running booking is never auto-closed by the watchdog. - return - default: - return - } - - upd := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil). - UpdateOne(map[string]any{"state": newState.EnumIndex()}, b.GetID()) - if upd.Err != "" { - logger.Error().Msgf("BookingWatchdog: failed to update booking %s: %s", b.GetID(), upd.Err) - return - } - b.State = newState - logger.Info().Msgf("BookingWatchdog: booking %s (exec=%s, type=%s) → %s", - b.GetID(), b.ExecutionsID, b.ResourceType, b.State) + if closingStates[b.State] { + processedBookings.Store(b.GetID(), struct{}{}) + return } - // Mark as handled before triggering async teardown (avoids double-trigger on next tick). + now := time.Now().UTC() + payload, err := json.Marshal(tools.WorkflowLifecycleEvent{ + BookingID: b.GetID(), + State: enum.FAILURE.EnumIndex(), + RealEnd: &now, + }) + if err != nil { + return + } + tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Method: int(tools.WORKFLOW_STEP_DONE_EVENT), + Payload: payload, + }) + + logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID()) processedBookings.Store(b.GetID(), struct{}{}) - - // Tear down infrastructure according to resource type. - switch b.ResourceType { - case tools.LIVE_DATACENTER, tools.COMPUTE_RESOURCE: - logger.Info().Msgf("BookingWatchdog: tearing down compute infra exec=%s", b.ExecutionsID) - go NewAdmiraltySetter(b.ExecutionsID).TeardownAsSource(ctx) // i'm the compute units. - go teardownMinioForComputeBooking(ctx, b, peerID) - - case tools.LIVE_STORAGE, tools.STORAGE_RESOURCE: - logger.Info().Msgf("BookingWatchdog: tearing down storage infra exec=%s", b.ExecutionsID) - go teardownMinioSourceBooking(ctx, b, peerID) - } } -// scanEndedBookings queries LIVE_DATACENTER / COMPUTE_RESOURCE bookings whose -// ExpectedEndDate + 1 min has passed and triggers TeardownAsSource for Admiralty, -// cleaning up the compute-side namespace once the execution window is over. -func scanEndedExec() error { - myself, err := oclib.GetMySelf() - if err != nil { - return fmt.Errorf("could not resolve local peer: %w", err) - } - peerID := myself.GetID() - res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil). +// ── Infra teardown helpers (called from nats.go on WORKFLOW_DONE_EVENT) ──────── + +// teardownAdmiraltyIfRemote triggers Admiralty TeardownAsTarget only when at +// least one compute booking for the execution is on a remote peer. +// Local executions do not involve Admiralty. +func teardownAdmiraltyIfRemote(exec *workflow_execution.WorkflowExecution, selfPeerID string) { + logger := oclib.GetLogger() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", selfPeerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ - // Only compute bookings require Admiralty source cleanup. - "state": {{ - Operator: dbs.GT.String(), - Value: 2, - }}, + "executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}}, + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}}, }, }, "", false) - if res.Err != "" { - return fmt.Errorf("ended-booking search failed: %s", res.Err) + if res.Err != "" || len(res.Data) == 0 { + return } for _, dbo := range res.Data { - b, ok := dbo.(*workflow_execution.WorkflowExecution) + b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } - go teardownAdmiraltyTarget(b) - } - return nil -} - -// teardownAdmiraltySource triggers TeardownAsSource for the compute-side namespace -// of an execution whose expected end date has passed. -func teardownAdmiraltyTarget(b *workflow_execution.WorkflowExecution) { - logger := oclib.GetLogger() - - // Each executionsID is processed at most once per process lifetime. - if _, done := processedEndBookings.Load(b.ExecutionsID); done { - return - } - processedEndBookings.Store(b.ExecutionsID, struct{}{}) - - logger.Info().Msgf("BookingWatchdog: tearing down Admiralty source exec=%s (booking=%s)", - b.ExecutionsID, b.GetID()) - if p, err := oclib.GetMySelf(); err == nil { - NewAdmiraltySetter(b.ExecutionsID).TeardownAsTarget(context.Background(), p.GetID()) + if b.DestPeerID != selfPeerID { + logger.Info().Msgf("InfraTeardown: Admiralty teardown exec=%s (remote peer=%s)", + exec.ExecutionsID, b.DestPeerID) + NewAdmiraltySetter(exec.ExecutionsID).TeardownAsTarget(context.Background(), selfPeerID) + return // one teardown per execution is enough + } } } -// teardownMinioForComputeBooking finds the LIVE_STORAGE bookings belonging to the same -// execution and triggers Minio-as-target teardown for each (K8s secret + configmap). -// The Minio-as-source side is handled separately by the storage booking's own watchdog pass. -func teardownMinioForComputeBooking(ctx context.Context, computeBooking *bookingmodel.Booking, localPeerID string) { +// teardownMinioForExecution tears down all Minio configuration for the execution: +// - storage bookings where this peer is the compute target → TeardownAsTarget +// - storage bookings where this peer is the Minio source → TeardownAsSource +func teardownMinioForExecution(ctx context.Context, executionsID string, localPeerID string) { logger := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ - "executions_id": {{Operator: dbs.EQUAL.String(), Value: computeBooking.ExecutionsID}}, + "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, }, }, "", false) if res.Err != "" || len(res.Data) == 0 { - logger.Warn().Msgf("BookingWatchdog: no storage booking found for exec=%s", computeBooking.ExecutionsID) return } for _, dbo := range res.Data { - sb, ok := dbo.(*bookingmodel.Booking) + b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } - event := minio.MinioDeleteEvent{ - ExecutionsID: computeBooking.ExecutionsID, - MinioID: sb.ResourceID, - SourcePeerID: sb.DestPeerID, // peer hosting Minio - DestPeerID: localPeerID, // this peer (compute/target) - OriginID: "", + if b.DestPeerID == localPeerID { + // This peer is the compute target: tear down K8s secret + configmap. + logger.Info().Msgf("InfraTeardown: Minio target teardown exec=%s storage=%s", executionsID, b.ResourceID) + event := minio.MinioDeleteEvent{ + ExecutionsID: executionsID, + MinioID: b.ResourceID, + SourcePeerID: b.DestPeerID, + DestPeerID: localPeerID, + OriginID: "", + } + minio.NewMinioSetter(executionsID, b.ResourceID).TeardownAsTarget(ctx, event) + } else { + // This peer is the Minio source: revoke SA + remove execution bucket. + logger.Info().Msgf("InfraTeardown: Minio source teardown exec=%s storage=%s", executionsID, b.ResourceID) + event := minio.MinioDeleteEvent{ + ExecutionsID: executionsID, + MinioID: b.ResourceID, + SourcePeerID: localPeerID, + DestPeerID: b.DestPeerID, + OriginID: "", + } + minio.NewMinioSetter(executionsID, b.ResourceID).TeardownAsSource(ctx, event) } - minio.NewMinioSetter(computeBooking.ExecutionsID, sb.ResourceID).TeardownAsTarget(ctx, event) } } -// teardownMinioSourceBooking triggers Minio-as-source teardown for a storage booking: -// revokes the scoped service account and removes the execution bucket on this Minio host. -func teardownMinioSourceBooking(ctx context.Context, b *bookingmodel.Booking, localPeerID string) { - event := minio.MinioDeleteEvent{ - ExecutionsID: b.ExecutionsID, - MinioID: b.ResourceID, - SourcePeerID: localPeerID, // this peer IS the Minio host - DestPeerID: b.DestPeerID, - OriginID: "", +// teardownPVCForExecution deletes all local PVCs provisioned for the execution. +// It searches LIVE_STORAGE bookings and resolves the storage name via the live storage. +func teardownPVCForExecution(ctx context.Context, executionsID string, localPeerID string) { + logger := oclib.GetLogger() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" || len(res.Data) == 0 { + return + } + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok { + continue + } + // Resolve storage name from live storage to compute the claim name. + storageName := storage.ResolveStorageName(b.ResourceID, localPeerID) + if storageName == "" { + continue + } + logger.Info().Msgf("InfraTeardown: PVC teardown exec=%s storage=%s", executionsID, b.ResourceID) + event := storage.PVCDeleteEvent{ + ExecutionsID: executionsID, + StorageID: b.ResourceID, + StorageName: storageName, + SourcePeerID: localPeerID, + DestPeerID: b.DestPeerID, + OriginID: "", + } + storage.NewPVCSetter(executionsID, b.ResourceID).TeardownAsSource(ctx, event) } - minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event) } diff --git a/infrastructure/infra_watchdog.go b/infrastructure/infra_watchdog.go new file mode 100644 index 0000000..1a60a4f --- /dev/null +++ b/infrastructure/infra_watchdog.go @@ -0,0 +1,331 @@ +package infrastructure + +import ( + "context" + "fmt" + "regexp" + "strings" + "time" + + "oc-datacenter/conf" + "oc-datacenter/infrastructure/minio" + "oc-datacenter/infrastructure/storage" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// uuidNsPattern matches Kubernetes namespace names that are execution UUIDs. +var uuidNsPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`) + +// WatchInfra is a safety-net watchdog that periodically scans Kubernetes for +// execution namespaces whose WorkflowExecution has reached a terminal state +// but whose infra was never torn down (e.g. because WORKFLOW_DONE_EVENT was +// missed due to oc-monitord or oc-datacenter crash/restart). +// +// Must be launched in a goroutine from main. +func WatchInfra() { + logger := oclib.GetLogger() + logger.Info().Msg("InfraWatchdog: started") + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for range ticker.C { + if err := scanOrphanedInfra(); err != nil { + logger.Error().Msg("InfraWatchdog: " + err.Error()) + } + if err := scanOrphanedMinio(); err != nil { + logger.Error().Msg("InfraWatchdog(minio): " + err.Error()) + } + if err := scanOrphanedAdmiraltyNodes(); err != nil { + logger.Error().Msg("InfraWatchdog(admiralty-nodes): " + err.Error()) + } + if err := scanOrphanedPVC(); err != nil { + logger.Error().Msg("InfraWatchdog(pvc): " + err.Error()) + } + } +} + +// scanOrphanedInfra lists all UUID-named Kubernetes namespaces, looks up their +// WorkflowExecution in the DB, and triggers teardown for any that are in a +// terminal state. Namespaces already in Terminating phase are skipped. +func scanOrphanedInfra() error { + logger := oclib.GetLogger() + + serv, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, + conf.GetConfig().KubeCA, + conf.GetConfig().KubeCert, + conf.GetConfig().KubeData, + ) + if err != nil { + return fmt.Errorf("failed to init k8s service: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + nsList, err := serv.Set.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list namespaces: %w", err) + } + + myself, err := oclib.GetMySelf() + if err != nil { + return fmt.Errorf("could not resolve local peer: %w", err) + } + peerID := myself.GetID() + + for _, ns := range nsList.Items { + executionsID := ns.Name + if !uuidNsPattern.MatchString(executionsID) { + continue + } + // Skip namespaces already being deleted by a previous teardown. + if ns.Status.Phase == v1.NamespaceTerminating { + continue + } + + exec := findTerminalExecution(executionsID, peerID) + if exec == nil { + continue + } + + logger.Info().Msgf("InfraWatchdog: orphaned infra detected for execution %s (state=%v) → teardown", + executionsID, exec.State) + go teardownInfraForExecution(exec.GetID(), executionsID) + } + return nil +} + +// scanOrphanedMinio scans LIVE_STORAGE bookings for executions that are in a +// terminal state and triggers Minio teardown for each unique executionsID found. +// This covers the case where the Kubernetes namespace is already gone (manual +// deletion, prior partial teardown) but Minio SA and bucket were never revoked. +func scanOrphanedMinio() error { + logger := oclib.GetLogger() + + myself, err := oclib.GetMySelf() + if err != nil { + return fmt.Errorf("could not resolve local peer: %w", err) + } + peerID := myself.GetID() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" { + return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err) + } + + // Collect unique executionsIDs to avoid redundant teardowns. + seen := map[string]bool{} + ctx := context.Background() + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok || seen[b.ExecutionsID] { + continue + } + + exec := findTerminalExecution(b.ExecutionsID, peerID) + if exec == nil { + continue + } + + seen[b.ExecutionsID] = true + + // Determine this peer's role and call the appropriate teardown. + if b.DestPeerID == peerID { + logger.Info().Msgf("InfraWatchdog(minio): orphaned target resources for exec %s → TeardownAsTarget", b.ExecutionsID) + event := minio.MinioDeleteEvent{ + ExecutionsID: b.ExecutionsID, + MinioID: b.ResourceID, + SourcePeerID: b.DestPeerID, + DestPeerID: peerID, + } + go minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsTarget(ctx, event) + } else { + logger.Info().Msgf("InfraWatchdog(minio): orphaned source resources for exec %s → TeardownAsSource", b.ExecutionsID) + event := minio.MinioDeleteEvent{ + ExecutionsID: b.ExecutionsID, + MinioID: b.ResourceID, + SourcePeerID: peerID, + DestPeerID: b.DestPeerID, + } + go minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event) + } + } + return nil +} + +// scanOrphanedAdmiraltyNodes lists all Kubernetes nodes, identifies Admiralty +// virtual nodes (name prefix "admiralty-{UUID}-") that are NotReady, and +// explicitly deletes them when their WorkflowExecution is in a terminal state. +// +// This covers the gap where the namespace is already gone (or Terminating) but +// the virtual node was never cleaned up by the Admiralty controller — which can +// happen when the node goes NotReady before the AdmiraltyTarget CRD is deleted. +func scanOrphanedAdmiraltyNodes() error { + logger := oclib.GetLogger() + + serv, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, + conf.GetConfig().KubeCA, + conf.GetConfig().KubeCert, + conf.GetConfig().KubeData, + ) + if err != nil { + return fmt.Errorf("failed to init k8s service: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + nodeList, err := serv.Set.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list nodes: %w", err) + } + + myself, err := oclib.GetMySelf() + if err != nil { + return fmt.Errorf("could not resolve local peer: %w", err) + } + peerID := myself.GetID() + + for _, node := range nodeList.Items { + // Admiralty virtual nodes are named: admiralty-{executionID}-target-{...} + rest := strings.TrimPrefix(node.Name, "admiralty-") + if rest == node.Name { + continue // not an admiralty node + } + // UUID is exactly 36 chars: 8-4-4-4-12 + if len(rest) < 36 { + continue + } + executionsID := rest[:36] + if !uuidNsPattern.MatchString(executionsID) { + continue + } + + // Only act on NotReady nodes. + ready := false + for _, cond := range node.Status.Conditions { + if cond.Type == v1.NodeReady { + ready = cond.Status == v1.ConditionTrue + break + } + } + if ready { + continue + } + + exec := findTerminalExecution(executionsID, peerID) + if exec == nil { + continue + } + + logger.Info().Msgf("InfraWatchdog(admiralty-nodes): NotReady orphaned node %s for terminal execution %s → deleting", + node.Name, executionsID) + if delErr := serv.Set.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); delErr != nil { + logger.Error().Msgf("InfraWatchdog(admiralty-nodes): failed to delete node %s: %v", node.Name, delErr) + } + } + return nil +} + +// scanOrphanedPVC scans LIVE_STORAGE bookings for executions that are in a +// terminal state and triggers PVC teardown for each one where this peer holds +// the local storage. This covers the case where the Kubernetes namespace was +// already deleted (or its teardown was partial) but the PersistentVolume +// (cluster-scoped) was never reclaimed. +// +// A LIVE_STORAGE booking is treated as a local PVC only when ResolveStorageName +// returns a non-empty name — the same guard used by teardownPVCForExecution. +func scanOrphanedPVC() error { + logger := oclib.GetLogger() + + myself, err := oclib.GetMySelf() + if err != nil { + return fmt.Errorf("could not resolve local peer: %w", err) + } + peerID := myself.GetID() + + res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, + }, + }, "", false) + + if res.Err != "" { + return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err) + } + + seen := map[string]bool{} + ctx := context.Background() + + for _, dbo := range res.Data { + b, ok := dbo.(*bookingmodel.Booking) + if !ok || seen[b.ExecutionsID+b.ResourceID] { + continue + } + + storageName := storage.ResolveStorageName(b.ResourceID, peerID) + if storageName == "" { + continue // not a local PVC booking + } + + exec := findTerminalExecution(b.ExecutionsID, peerID) + if exec == nil { + continue + } + + seen[b.ExecutionsID+b.ResourceID] = true + + logger.Info().Msgf("InfraWatchdog(pvc): orphaned PVC for exec %s storage %s → TeardownAsSource", + b.ExecutionsID, b.ResourceID) + event := storage.PVCDeleteEvent{ + ExecutionsID: b.ExecutionsID, + StorageID: b.ResourceID, + StorageName: storageName, + SourcePeerID: peerID, + DestPeerID: b.DestPeerID, + } + go storage.NewPVCSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event) + } + return nil +} + +// findTerminalExecution returns the WorkflowExecution for the given executionsID +// if it exists in the DB and is in a terminal state, otherwise nil. +func findTerminalExecution(executionsID string, peerID string) *workflow_execution.WorkflowExecution { + res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil). + Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, + }, + }, "", false) + + if res.Err != "" || len(res.Data) == 0 { + return nil + } + + exec, ok := res.Data[0].(*workflow_execution.WorkflowExecution) + if !ok { + return nil + } + + if !closingStates[exec.State] { + return nil + } + return exec +} diff --git a/infrastructure/minio/minio_setter.go b/infrastructure/minio/minio_setter.go index 2f9404f..af1cf2f 100644 --- a/infrastructure/minio/minio_setter.go +++ b/infrastructure/minio/minio_setter.go @@ -44,7 +44,10 @@ type minioConsidersPayload struct { // emitConsiders publishes a PB_CONSIDERS back to OriginID with the result of // the minio provisioning. secret is the provisioned credential; err is nil on success. -func emitConsiders(executionsID, originID, secret string, provErr error) { +// When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT +// instead of routing through PROPALGATION_EVENT. +func emitConsiders(executionsID, originID, secret string, provErr error, self bool) { + fmt.Println("emitConsiders !") var errStr *string if provErr != nil { s := provErr.Error() @@ -56,6 +59,15 @@ func emitConsiders(executionsID, originID, secret string, provErr error) { Secret: secret, Error: errStr, }) + if self { + go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: tools.STORAGE_RESOURCE, + Method: int(tools.CONSIDERS_EVENT), + Payload: payload, + }) + return + } b, _ := json.Marshal(&tools.PropalgationMessage{ DataType: tools.STORAGE_RESOURCE.EnumIndex(), Action: tools.PB_CONSIDERS, @@ -88,7 +100,7 @@ func NewMinioSetter(execID, minioID string) *MinioSetter { // 4. If source and dest are the same peer, calls InitializeAsTarget directly. // Otherwise, publishes a MinioCredentialEvent via NATS (Phase 2) so that // oc-discovery can route the credentials to the compute peer. -func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destPeerID, originID string) { +func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destPeerID, originID string, self bool) { logger := oclib.GetLogger() url, err := m.loadMinioURL(localPeerID) @@ -128,7 +140,7 @@ func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destP if destPeerID == localPeerID { // Same peer: store the secret locally without going through NATS. - m.InitializeAsTarget(ctx, event) + m.InitializeAsTarget(ctx, event, true) return } @@ -138,7 +150,6 @@ func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destP logger.Error().Msg("MinioSetter.InitializeAsSource: failed to marshal credential event: " + err.Error()) return } - if b, err := json.Marshal(&tools.PropalgationMessage{ DataType: -1, Action: tools.PB_MINIO_CONFIG, @@ -146,20 +157,23 @@ func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destP }); err == nil { go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", - Datatype: -1, + Datatype: tools.STORAGE_RESOURCE, User: "", Method: int(tools.PROPALGATION_EVENT), Payload: b, }) logger.Info().Msg("MinioSetter.InitializeAsSource: credentials published via NATS for " + m.ExecutionsID) } + } // InitializeAsTarget is called on the peer that runs the compute workload. // // It stores the Minio credentials received from the source peer (via NATS or directly) // as a Kubernetes secret inside the execution namespace, making them available to pods. -func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredentialEvent) { +// self must be true when the origin peer is the local peer (direct CONSIDERS_EVENT emission). +func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredentialEvent, self bool) { + fmt.Println("InitializeAsTarget is Self :", self) logger := oclib.GetLogger() k, err := tools.NewKubernetesService( @@ -173,18 +187,18 @@ func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredent if err := k.CreateSecret(ctx, event.MinioID, event.ExecutionsID, event.Access, event.Secret); err != nil { logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create k8s secret: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, "", err) + emitConsiders(event.ExecutionsID, event.OriginID, "", err, self) return } - if err := NewMinioService(event.URL).CreateMinioConfigMap(event.MinioID, event.ExecutionsID, event.URL); err == nil { + if err := NewMinioService(event.URL).CreateMinioConfigMap(event.MinioID, event.ExecutionsID, event.URL); err != nil { logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create config map: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, "", err) + emitConsiders(event.ExecutionsID, event.OriginID, "", err, self) return } logger.Info().Msg("MinioSetter.InitializeAsTarget: Minio credentials stored in namespace " + event.ExecutionsID) - emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil) + emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil, self) } // MinioDeleteEvent is the NATS payload used to tear down Minio resources. @@ -213,7 +227,7 @@ func (m *MinioSetter) TeardownAsTarget(ctx context.Context, event MinioDeleteEve ) if err != nil { logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to create k8s service: " + err.Error()) - emitConsiders(event.ExecutionsID, event.OriginID, "", err) + emitConsiders(event.ExecutionsID, event.OriginID, "", err, event.SourcePeerID == event.DestPeerID) return } diff --git a/infrastructure/namespace.go b/infrastructure/namespace.go index 8dcaece..afec5de 100644 --- a/infrastructure/namespace.go +++ b/infrastructure/namespace.go @@ -3,6 +3,7 @@ package infrastructure import ( "context" "oc-datacenter/conf" + "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/tools" @@ -21,5 +22,7 @@ func CreateNamespace(ns string) error { logger.Error().Msg("CreateNamespace: failed to init k8s service: " + err.Error()) return err } - return serv.ProvisionExecutionNamespace(context.Background(), ns) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return serv.ProvisionExecutionNamespace(ctx, ns) } diff --git a/infrastructure/nats.go b/infrastructure/nats.go index 54879b7..1f8ac24 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -5,24 +5,57 @@ import ( "encoding/json" "fmt" "oc-datacenter/infrastructure/minio" + "oc-datacenter/infrastructure/storage" "sync" + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) // roleWaiters maps executionID → channel expecting the role-assignment message from OC discovery. var roleWaiters sync.Map +// teardownInfraForExecution handles infrastructure cleanup when a workflow terminates. +// oc-datacenter is responsible only for infra here — booking/execution state +// is managed by oc-scheduler. +func teardownInfraForExecution(executionID string, executionsID string) { + logger := oclib.GetLogger() + + myself, err := oclib.GetMySelf() + if err != nil || myself == nil { + return + } + selfPeerID := myself.GetID() + + adminReq := &tools.APIRequest{Admin: true} + res, _, loadErr := workflow_execution.NewAccessor(adminReq).LoadOne(executionID) + if loadErr != nil || res == nil { + logger.Warn().Msgf("teardownInfraForExecution: execution %s not found", executionID) + return + } + exec := res.(*workflow_execution.WorkflowExecution) + + ctx := context.Background() + teardownAdmiraltyIfRemote(exec, selfPeerID) + teardownMinioForExecution(ctx, executionsID, selfPeerID) + teardownPVCForExecution(ctx, executionsID, selfPeerID) +} + // ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event. // -// When MinioID is non-empty the event concerns Minio credential provisioning; -// otherwise it concerns Admiralty kubeconfig provisioning. +// When MinioID is non-empty and Local is false, the event concerns Minio credential provisioning. +// When Local is true, the event concerns local PVC provisioning. +// Otherwise it concerns Admiralty kubeconfig provisioning. type ArgoKubeEvent struct { ExecutionsID string `json:"executions_id"` DestPeerID string `json:"dest_peer_id"` Type tools.DataType `json:"data_type"` SourcePeerID string `json:"source_peer_id"` MinioID string `json:"minio_id,omitempty"` + // Local signals that this STORAGE_RESOURCE event is for a local PVC (not Minio). + Local bool `json:"local,omitempty"` + StorageName string `json:"storage_name,omitempty"` // OriginID is the peer that initiated the request; the PB_CONSIDERS // response is routed back to this peer once provisioning completes. OriginID string `json:"origin_id,omitempty"` @@ -34,7 +67,7 @@ func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ // ─── ARGO_KUBE_EVENT ──────────────────────────────────────────────────────── // Triggered by oc-discovery to notify this peer of a provisioning task. - // Dispatches to Admiralty or Minio based on whether MinioID is set. + // Dispatches to Admiralty, Minio, or local PVC based on event fields. tools.ARGO_KUBE_EVENT: func(resp tools.NATSResponse) { argo := &ArgoKubeEvent{} if err := json.Unmarshal(resp.Payload, argo); err != nil { @@ -42,50 +75,87 @@ func ListenNATS() { } if argo.Type == tools.STORAGE_RESOURCE { - fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT") - // ── Minio credential provisioning ────────────────────────────── - setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID) - if argo.SourcePeerID == argo.DestPeerID { - fmt.Println("CONFIG MYSELF") - err := CreateNamespace(argo.ExecutionsID) - fmt.Println("NS", err) - // Same peer: source creates credentials and immediately stores them. - go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID) - } else { - // Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "") - // so oc-discovery routes the role-assignment to the Minio host. - phase1 := minio.MinioCredentialEvent{ + if argo.Local { + fmt.Println("DETECT LOCAL PVC ARGO_KUBE_EVENT") + // ── Local PVC provisioning ────────────────────────────────── + setter := storage.NewPVCSetter(argo.ExecutionsID, argo.MinioID) + event := storage.PVCProvisionEvent{ ExecutionsID: argo.ExecutionsID, - MinioID: argo.MinioID, + StorageID: argo.MinioID, + StorageName: argo.StorageName, SourcePeerID: argo.SourcePeerID, DestPeerID: argo.DestPeerID, OriginID: argo.OriginID, } - if b, err := json.Marshal(phase1); err == nil { - if b2, err := json.Marshal(&tools.PropalgationMessage{ - Payload: b, - Action: tools.PB_MINIO_CONFIG, - }); err == nil { - fmt.Println("CONFIG THEM") - go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ - FromApp: "oc-datacenter", - Datatype: -1, - User: resp.User, - Method: int(tools.PROPALGATION_EVENT), - Payload: b2, - }) + if argo.SourcePeerID == argo.DestPeerID { + fmt.Println("CONFIG PVC MYSELF") + err := CreateNamespace(argo.ExecutionsID) + fmt.Println("NS", err) + go setter.InitializeAsSource(context.Background(), event, true) + } else { + // Cross-peer: route to dest peer via PB_PVC_CONFIG. + if b, err := json.Marshal(event); err == nil { + if b2, err := json.Marshal(&tools.PropalgationMessage{ + Payload: b, + Action: tools.PB_PVC_CONFIG, + }); err == nil { + fmt.Println("CONFIG PVC THEM") + go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: -1, + User: resp.User, + Method: int(tools.PROPALGATION_EVENT), + Payload: b2, + }) + } + } + } + } else { + fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT") + // ── Minio credential provisioning ────────────────────────────── + setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID) + if argo.SourcePeerID == argo.DestPeerID { + fmt.Println("CONFIG MYSELF") + err := CreateNamespace(argo.ExecutionsID) + fmt.Println("NS", err) + go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID, true) + } else { + // Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "") + // so oc-discovery routes the role-assignment to the Minio host. + phase1 := minio.MinioCredentialEvent{ + ExecutionsID: argo.ExecutionsID, + MinioID: argo.MinioID, + SourcePeerID: argo.SourcePeerID, + DestPeerID: argo.DestPeerID, + OriginID: argo.OriginID, + } + if b, err := json.Marshal(phase1); err == nil { + if b2, err := json.Marshal(&tools.PropalgationMessage{ + Payload: b, + Action: tools.PB_MINIO_CONFIG, + }); err == nil { + fmt.Println("CONFIG THEM") + go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: -1, + User: resp.User, + Method: int(tools.PROPALGATION_EVENT), + Payload: b2, + }) + } } } } } else { fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT") // ── Admiralty kubeconfig provisioning (existing behaviour) ────── + fmt.Println(argo.SourcePeerID, argo.DestPeerID) if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG MYSELF") err := CreateNamespace(argo.ExecutionsID) fmt.Println("NS", err) go NewAdmiraltySetter(argo.ExecutionsID).InitializeAsSource( - context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID) + context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID, true) } else if b, err := json.Marshal(argo); err == nil { if b2, err := json.Marshal(&tools.PropalgationMessage{ Payload: b, @@ -113,14 +183,16 @@ func ListenNATS() { if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil { if kubeconfigEvent.Kubeconfig != "" { // Phase 2: kubeconfig present → this peer is the TARGET (scheduler). + fmt.Println("CreateAdmiraltyTarget") NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget( - context.Background(), kubeconfigEvent) + context.Background(), kubeconfigEvent, false) } else { err := CreateNamespace(kubeconfigEvent.ExecutionsID) fmt.Println("NS", err) // Phase 1: no kubeconfig → this peer is the SOURCE (compute). + fmt.Println("CreateAdmiraltySource") NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource( - context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID) + context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID, false) } } }, @@ -134,27 +206,59 @@ func ListenNATS() { if minioEvent.Access != "" { // Phase 2: credentials present → this peer is the TARGET (compute). minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget( - context.Background(), minioEvent) + context.Background(), minioEvent, false) } else { err := CreateNamespace(minioEvent.ExecutionsID) fmt.Println("NS", err) // Phase 1: no credentials → this peer is the SOURCE (Minio host). minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource( - context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID) + context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID, false) } } }, + // ─── PVC_CONFIG_EVENT ──────────────────────────────────────────────────────── + // Forwarded by oc-discovery for cross-peer local PVC provisioning. + // The dest peer creates the PVC in its own cluster. + tools.PVC_CONFIG_EVENT: func(resp tools.NATSResponse) { + event := storage.PVCProvisionEvent{} + if err := json.Unmarshal(resp.Payload, &event); err == nil { + err := CreateNamespace(event.ExecutionsID) + fmt.Println("NS", err) + storage.NewPVCSetter(event.ExecutionsID, event.StorageID).InitializeAsSource( + context.Background(), event, false) + } + }, + + // ─── WORKFLOW_DONE_EVENT ───────────────────────────────────────────────────── + // Emitted by oc-monitord when the top-level Argo workflow reaches a terminal + // phase. oc-datacenter is responsible only for infrastructure teardown here: + // booking/execution state management is handled entirely by oc-scheduler. + tools.WORKFLOW_DONE_EVENT: func(resp tools.NATSResponse) { + var evt tools.WorkflowLifecycleEvent + if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.ExecutionsID == "" { + return + } + go teardownInfraForExecution(evt.ExecutionID, evt.ExecutionsID) + }, + // ─── REMOVE_RESOURCE ──────────────────────────────────────────────────────── // Routed by oc-discovery via ProtocolDeleteResource for datacenter teardown. // Only STORAGE_RESOURCE and COMPUTE_RESOURCE deletions are handled here. tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { switch resp.Datatype { case tools.STORAGE_RESOURCE: - deleteEvent := minio.MinioDeleteEvent{} - if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" { - go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). - TeardownAsSource(context.Background(), deleteEvent) + // Try PVC delete first (Local=true), fall back to Minio. + pvcEvent := storage.PVCDeleteEvent{} + if err := json.Unmarshal(resp.Payload, &pvcEvent); err == nil && pvcEvent.ExecutionsID != "" && pvcEvent.StorageName != "" { + go storage.NewPVCSetter(pvcEvent.ExecutionsID, pvcEvent.StorageID). + TeardownAsSource(context.Background(), pvcEvent) + } else { + deleteEvent := minio.MinioDeleteEvent{} + if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" { + go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). + TeardownAsSource(context.Background(), deleteEvent) + } } case tools.COMPUTE_RESOURCE: argo := &ArgoKubeEvent{} diff --git a/infrastructure/storage/pvc_setter.go b/infrastructure/storage/pvc_setter.go new file mode 100644 index 0000000..9ac7e7e --- /dev/null +++ b/infrastructure/storage/pvc_setter.go @@ -0,0 +1,174 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "strings" + + "oc-datacenter/conf" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/live" + "cloud.o-forge.io/core/oc-lib/tools" +) + +// PVCProvisionEvent is the NATS payload for local PVC provisioning. +// Same-peer deployments are handled directly; cross-peer routes via PB_PVC_CONFIG. +type PVCProvisionEvent struct { + ExecutionsID string `json:"executions_id"` + StorageID string `json:"storage_id"` + StorageName string `json:"storage_name"` + SourcePeerID string `json:"source_peer_id"` + DestPeerID string `json:"dest_peer_id"` + OriginID string `json:"origin_id"` +} + +// PVCDeleteEvent is the NATS payload for local PVC teardown. +type PVCDeleteEvent struct { + ExecutionsID string `json:"executions_id"` + StorageID string `json:"storage_id"` + StorageName string `json:"storage_name"` + SourcePeerID string `json:"source_peer_id"` + DestPeerID string `json:"dest_peer_id"` + OriginID string `json:"origin_id"` +} + +// ClaimName returns the deterministic PVC name shared by oc-datacenter and oc-monitord. +func ClaimName(storageName, executionsID string) string { + return strings.ReplaceAll(strings.ToLower(storageName), " ", "-") + "-" + executionsID +} + +// PVCSetter carries the execution context for a local PVC provisioning. +type PVCSetter struct { + ExecutionsID string + StorageID string +} + +func NewPVCSetter(execID, storageID string) *PVCSetter { + return &PVCSetter{ExecutionsID: execID, StorageID: storageID} +} + +func emitConsiders(executionsID, originID string, provErr error, self bool) { + type pvcConsidersPayload struct { + OriginID string `json:"origin_id"` + ExecutionsID string `json:"executions_id"` + Error *string `json:"error,omitempty"` + } + var errStr *string + if provErr != nil { + s := provErr.Error() + errStr = &s + } + payload, _ := json.Marshal(pvcConsidersPayload{ + OriginID: originID, + ExecutionsID: executionsID, + Error: errStr, + }) + if self { + go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: tools.STORAGE_RESOURCE, + Method: int(tools.CONSIDERS_EVENT), + Payload: payload, + }) + return + } + b, _ := json.Marshal(&tools.PropalgationMessage{ + DataType: tools.STORAGE_RESOURCE.EnumIndex(), + Action: tools.PB_CONSIDERS, + Payload: payload, + }) + go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: -1, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) +} + +// InitializeAsSource creates the PVC in the execution namespace on the local cluster. +// self must be true when source and dest are the same peer (direct CONSIDERS_EVENT emission). +func (p *PVCSetter) InitializeAsSource(ctx context.Context, event PVCProvisionEvent, self bool) { + logger := oclib.GetLogger() + + sizeStr, err := p.loadStorageSize(event.SourcePeerID) + if err != nil { + logger.Error().Msg("PVCSetter.InitializeAsSource: " + err.Error()) + emitConsiders(event.ExecutionsID, event.OriginID, err, self) + return + } + + k, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, + conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData, + ) + if err != nil { + logger.Error().Msg("PVCSetter.InitializeAsSource: failed to create k8s service: " + err.Error()) + emitConsiders(event.ExecutionsID, event.OriginID, err, self) + return + } + + claimName := ClaimName(event.StorageName, event.ExecutionsID) + if err := k.CreatePVC(ctx, claimName, event.ExecutionsID, sizeStr); err != nil { + logger.Error().Msg("PVCSetter.InitializeAsSource: failed to create PVC: " + err.Error()) + emitConsiders(event.ExecutionsID, event.OriginID, err, self) + return + } + + logger.Info().Msg("PVCSetter.InitializeAsSource: PVC " + claimName + " created in " + event.ExecutionsID) + emitConsiders(event.ExecutionsID, event.OriginID, nil, self) +} + +// TeardownAsSource deletes the PVC from the execution namespace. +func (p *PVCSetter) TeardownAsSource(ctx context.Context, event PVCDeleteEvent) { + logger := oclib.GetLogger() + + k, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, + conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData, + ) + if err != nil { + logger.Error().Msg("PVCSetter.TeardownAsSource: failed to create k8s service: " + err.Error()) + return + } + + claimName := ClaimName(event.StorageName, event.ExecutionsID) + if err := k.DeletePVC(ctx, claimName, event.ExecutionsID); err != nil { + logger.Error().Msg("PVCSetter.TeardownAsSource: failed to delete PVC: " + err.Error()) + return + } + + logger.Info().Msg("PVCSetter.TeardownAsSource: PVC " + claimName + " deleted from " + event.ExecutionsID) +} + +// ResolveStorageName returns the live storage name for a given storageID, or "" if not found. +func ResolveStorageName(storageID, peerID string) string { + res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false) + if res.Err != "" { + return "" + } + for _, dbo := range res.Data { + l := dbo.(*live.LiveStorage) + if slices.Contains(l.ResourcesID, storageID) { + return l.GetName() + } + } + return "" +} + +// loadStorageSize looks up the SizeGB for this storage in live storages. +func (p *PVCSetter) loadStorageSize(peerID string) (string, error) { + res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false) + if res.Err != "" { + return "", fmt.Errorf("loadStorageSize: %s", res.Err) + } + for _, dbo := range res.Data { + l := dbo.(*live.LiveStorage) + if slices.Contains(l.ResourcesID, p.StorageID) && l.SizeGB > 0 { + return fmt.Sprintf("%dGi", l.SizeGB), nil + } + } + return "10Gi", nil +} diff --git a/main.go b/main.go index 09ae6c0..ed46659 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,9 @@ package main import ( - "encoding/base64" "oc-datacenter/conf" "oc-datacenter/infrastructure" _ "oc-datacenter/routers" - "os" oclib "cloud.o-forge.io/core/oc-lib" beego "github.com/beego/beego/v2/server/web" @@ -17,21 +15,13 @@ func main() { // Load the right config file o := oclib.GetConfLoader(appname) conf.GetConfig().Mode = o.GetStringDefault("MODE", "kubernetes") - conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", os.Getenv("KUBERNETES_SERVICE_HOST")) + conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc.cluster.local") conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443") + conf.GetConfig().KubeExternalHost = o.GetStringDefault("KUBE_EXTERNAL_HOST", "") - sDec, err := base64.StdEncoding.DecodeString(o.GetStringDefault("KUBE_CA", "")) - if err == nil { - conf.GetConfig().KubeCA = string(sDec) - } - sDec, err = base64.StdEncoding.DecodeString(o.GetStringDefault("KUBE_CERT", "")) - if err == nil { - conf.GetConfig().KubeCert = string(sDec) - } - sDec, err = base64.StdEncoding.DecodeString(o.GetStringDefault("KUBE_DATA", "")) - if err == nil { - conf.GetConfig().KubeData = string(sDec) - } + conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpNeE1qY3dPVFl3SGhjTk1qWXdNekV3TURjeE9ERTJXaGNOTXpZd016QTNNRGN4T0RFMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpNeE1qY3dPVFl3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFReG81cXQ0MGxEekczRHJKTE1wRVBrd0ZBY1FmbC8vVE1iWjZzemMreHAKbmVzVzRTSTdXK1lWdFpRYklmV2xBMTRaazQvRFlDMHc1YlgxZU94RVVuL0pvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVXBLM2pGK25IRlZSbDcwb3ZRVGZnCmZabGNQZE13Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnVnkyaUx0Y0xaYm1vTnVoVHdKbU5sWlo3RVlBYjJKNW0KSjJYbG1UbVF5a2tDSUhLbzczaDBkdEtUZTlSa0NXYTJNdStkS1FzOXRFU0tBV0x1emlnYXBHYysKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=") + conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrakNDQVRlZ0F3SUJBZ0lJQUkvSUg2R2Rodm93Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOemN6TVRJM01EazJNQjRYRFRJMk1ETXhNREEzTVRneE5sb1hEVEkzTURNeApNREEzTVRneE5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJQTTdBVEZQSmFMMjUrdzAKUU1vZUIxV2hBRW4vWnViM0tSRERrYnowOFhwQWJ2akVpdmdnTkdpdG4wVmVsaEZHamRmNHpBT29Nd1J3M21kbgpYSGtHVDB5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUVZLOThaMEMxcFFyVFJSMGVLZHhIa2o0ejFJREFLQmdncWhrak9QUVFEQWdOSkFEQkcKQWlFQXZYWll6Zk9iSUtlWTRtclNsRmt4ZS80a0E4K01ieDc1UDFKRmNlRS8xdGNDSVFDNnM0ZXlZclhQYmNWSgpxZm5EamkrZ1RacGttN0tWSTZTYTlZN2FSRGFabUE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCi0tLS0tQkVHSU4gQ0VSVElGSUNBVEUtLS0tLQpNSUlCZURDQ0FSMmdBd0lCQWdJQkFEQUtCZ2dxaGtqT1BRUURBakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwClpXNTBMV05oUURFM056TXhNamN3T1RZd0hoY05Nall3TXpFd01EY3hPREUyV2hjTk16WXdNekEzTURjeE9ERTIKV2pBak1TRXdId1lEVlFRRERCaHJNM010WTJ4cFpXNTBMV05oUURFM056TXhNamN3T1RZd1dUQVRCZ2NxaGtqTwpQUUlCQmdncWhrak9QUU1CQndOQ0FBUzV1NGVJbStvVnV1SFI0aTZIOU1kVzlyUHdJbFVPNFhIMEJWaDRUTGNlCkNkMnRBbFVXUW5FakxMdlpDWlVaYTlzTlhKOUVtWWt5S0dtQWR2TE9FbUVrbzBJd1FEQU9CZ05WSFE4QkFmOEUKQkFNQ0FxUXdEd1lEVlIwVEFRSC9CQVV3QXdFQi96QWRCZ05WSFE0RUZnUVVGU3ZmR2RBdGFVSzAwVWRIaW5jUgo1SStNOVNBd0NnWUlLb1pJemowRUF3SURTUUF3UmdJaEFMY2xtQnR4TnpSVlBvV2hoVEVKSkM1Z3VNSGsvcFZpCjFvYXJ2UVJxTWRKcUFpRUEyR1dNTzlhZFFYTEQwbFZKdHZMVkc1M3I0M0lxMHpEUUQwbTExMVZyL1MwPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==") + conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVkSTRZN3lRU1ZwRGNrblhsQmJEaXBWZHRMWEVsYVBkN3VBZHdBWFFya2xvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFOHpzQk1VOGxvdmJuN0RSQXloNEhWYUVBU2Y5bTV2Y3BFTU9SdlBUeGVrQnUrTVNLK0NBMAphSzJmUlY2V0VVYU4xL2pNQTZnekJIRGVaMmRjZVFaUFRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=") conf.GetConfig().MonitorMode = o.GetStringDefault("MONITOR_MODE", "prometheus") conf.GetConfig().MinioRootKey = o.GetStringDefault("MINIO_ADMIN_ACCESS", "") @@ -40,6 +30,7 @@ func main() { go infrastructure.ListenNATS() go infrastructure.WatchBookings() + go infrastructure.WatchInfra() beego.Run() }