From 6c0b07b49d97549c365a89dd4236920b4e35d5c0 Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 18 Mar 2026 16:43:44 +0100 Subject: [PATCH] Missing Create Namespace --- go.mod | 2 +- go.sum | 2 + infrastructure/monitor/prometheus.go | 2 +- infrastructure/monitor/vector.go | 2 +- infrastructure/namespace.go | 65 +++++++++++++++ infrastructure/nats.go | 119 ++++++++++++++------------- 6 files changed, 134 insertions(+), 58 deletions(-) create mode 100644 infrastructure/namespace.go diff --git a/go.mod b/go.mod index 6062364..7da023e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-datacenter go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 + cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406 github.com/beego/beego/v2 v2.3.8 github.com/golang-jwt/jwt/v5 v5.2.2 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 diff --git a/go.sum b/go.sum index 62fefb2..dfd6965 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260224120019-0f6aa1fe7881 h1:1JUGErc+3Rund cloud.o-forge.io/core/oc-lib v0.0.0-20260224120019-0f6aa1fe7881/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 h1:p9uJjMY+QkE4neA+xRmIRtAm9us94EKZqgajDdLOd0Y= cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406 h1:FN1EtRWn228JprAbnY5K863Fzj+SzMqQtKRtwvECbLw= +cloud.o-forge.io/core/oc-lib v0.0.0-20260318143822-5976795d4406/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc= github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg= diff --git a/infrastructure/monitor/prometheus.go b/infrastructure/monitor/prometheus.go index 1b0bf47..237878d 100644 --- a/infrastructure/monitor/prometheus.go +++ b/infrastructure/monitor/prometheus.go @@ -187,7 +187,7 @@ func (p *PrometheusService) Stream(ctx context.Context, bookingID string, interv bk.ExecutionMetrics[kk] = append(bk.ExecutionMetrics[kk], vv...) } } - bk.GetAccessor(nil).UpdateOne(bk, bookingID) + bk.GetAccessor(nil).UpdateOne(bk.Serialize(bk), bookingID) mets = map[string][]models.MetricsSnapshot{} count = 0 } diff --git a/infrastructure/monitor/vector.go b/infrastructure/monitor/vector.go index c5c1661..9cc45a5 100644 --- a/infrastructure/monitor/vector.go +++ b/infrastructure/monitor/vector.go @@ -114,7 +114,7 @@ func (v *VectorService) ListenVector(ctx context.Context, b *booking.Booking, in b.ExecutionMetrics[kk] = append(b.ExecutionMetrics[kk], vv...) } } - b.GetAccessor(nil).UpdateOne(b, b.GetID()) + b.GetAccessor(nil).UpdateOne(b.Serialize(b), b.GetID()) mets = map[string][]models.MetricsSnapshot{} count = 0 } diff --git a/infrastructure/namespace.go b/infrastructure/namespace.go new file mode 100644 index 0000000..f073eaf --- /dev/null +++ b/infrastructure/namespace.go @@ -0,0 +1,65 @@ +package infrastructure + +import ( + "context" + "oc-datacenter/conf" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/tools" +) + +// --------------------------------------------------------------------------- +// Kubernetes namespace helper +// --------------------------------------------------------------------------- + +func CreateNamespace(ns string) error { + /* + * This function is used to create a namespace. + * It takes the following parameters: + * - ns: the namespace you want to create + */ + logger := oclib.GetLogger() + serv, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, + conf.GetConfig().KubeCert, conf.GetConfig().KubeData) + if err != nil { + return nil + } + c := context.Background() + + ok, err := serv.GetNamespace(c, ns) + if ok != nil && err == nil { + logger.Debug().Msg("A namespace with name " + ns + " already exists") + return nil + } + if err != nil { + return err + } + + err = serv.CreateNamespace(c, ns) + if err != nil { + return err + } + err = serv.CreateServiceAccount(c, ns) + if err != nil { + return err + } + role := "argo-role" + err = serv.CreateRole(c, ns, role, + [][]string{ + {"coordination.k8s.io"}, + {""}, + {""}}, + [][]string{ + {"leases"}, + {"secrets"}, + {"pods"}}, + [][]string{ + {"get", "create", "update"}, + {"get"}, + {"patch"}}) + if err != nil { + return err + } + return serv.CreateRoleBinding(c, ns, "argo-role-binding", role) +} diff --git a/infrastructure/nats.go b/infrastructure/nats.go index 3cfb258..54879b7 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -3,6 +3,7 @@ package infrastructure import ( "context" "encoding/json" + "fmt" "oc-datacenter/infrastructure/minio" "sync" @@ -41,9 +42,13 @@ func ListenNATS() { } if argo.Type == tools.STORAGE_RESOURCE { + fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT") // ── Minio credential provisioning ────────────────────────────── setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID) if argo.SourcePeerID == argo.DestPeerID { + fmt.Println("CONFIG MYSELF") + err := CreateNamespace(argo.ExecutionsID) + fmt.Println("NS", err) // Same peer: source creates credentials and immediately stores them. go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID) } else { @@ -61,6 +66,7 @@ func ListenNATS() { Payload: b, Action: tools.PB_MINIO_CONFIG, }); err == nil { + fmt.Println("CONFIG THEM") go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: -1, @@ -72,8 +78,12 @@ func ListenNATS() { } } } else { + fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT") // ── Admiralty kubeconfig provisioning (existing behaviour) ────── if argo.SourcePeerID == argo.DestPeerID { + fmt.Println("CONFIG MYSELF") + err := CreateNamespace(argo.ExecutionsID) + fmt.Println("NS", err) go NewAdmiraltySetter(argo.ExecutionsID).InitializeAsSource( context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID) } else if b, err := json.Marshal(argo); err == nil { @@ -81,6 +91,7 @@ func ListenNATS() { Payload: b, Action: tools.PB_ADMIRALTY_CONFIG, }); err == nil { + fmt.Println("CONFIG THEM") go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: -1, @@ -93,63 +104,61 @@ func ListenNATS() { } }, - // ─── PROPALGATION_EVENT ───────────────────────────────────────────────────── - // Routes messages forwarded by oc-discovery to the right handler. - tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { - if resp.FromApp != "oc-discovery" { - return - } - var prop tools.PropalgationMessage - if err := json.Unmarshal(resp.Payload, &prop); err != nil { - return - } - switch prop.Action { + // ─── ADMIRALTY_CONFIG_EVENT ───────────────────────────────────────────────── + // Forwarded by oc-discovery after receiving via libp2p ProtocolAdmiraltyConfigResource. + // Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence). + tools.ADMIRALTY_CONFIG_EVENT: func(resp tools.NATSResponse) { - // ── Admiralty ────────────────────────────────────────────────────── - case tools.PB_ADMIRALTY_CONFIG: - kubeconfigEvent := KubeconfigEvent{} - if err := json.Unmarshal(prop.Payload, &kubeconfigEvent); err == nil { - if kubeconfigEvent.Kubeconfig != "" { - // Phase 2: kubeconfig present → this peer is the TARGET (scheduler). - NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget( - context.Background(), kubeconfigEvent) - } else { - // Phase 1: no kubeconfig → this peer is the SOURCE (compute). - NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource( - context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID) - } - } - - // ── Minio ────────────────────────────────────────────────────────── - case tools.PB_MINIO_CONFIG: - minioEvent := minio.MinioCredentialEvent{} - if err := json.Unmarshal(prop.Payload, &minioEvent); err == nil { - if minioEvent.Access != "" { - // Phase 2: credentials present → this peer is the TARGET (compute). - minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget( - context.Background(), minioEvent) - } else { - // Phase 1: no credentials → this peer is the SOURCE (Minio host). - minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource( - context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID) - } - } - - // ── Deletion (routed by oc-discovery to the source peer) ─────────── - case tools.PB_DELETE: - argo := &ArgoKubeEvent{} - if err := json.Unmarshal(prop.Payload, argo); err != nil || argo.ExecutionsID == "" { - return - } - if argo.Type == tools.STORAGE_RESOURCE { - // Minio source teardown: revoke credentials + delete bucket. - deleteEvent := minio.MinioDeleteEvent{} - if err := json.Unmarshal(prop.Payload, &deleteEvent); err == nil { - go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). - TeardownAsSource(context.Background(), deleteEvent) - } + kubeconfigEvent := KubeconfigEvent{} + if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil { + if kubeconfigEvent.Kubeconfig != "" { + // Phase 2: kubeconfig present → this peer is the TARGET (scheduler). + NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget( + context.Background(), kubeconfigEvent) } else { - // Admiralty source teardown: delete AdmiraltySource + namespace. + err := CreateNamespace(kubeconfigEvent.ExecutionsID) + fmt.Println("NS", err) + // Phase 1: no kubeconfig → this peer is the SOURCE (compute). + NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource( + context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID) + } + } + }, + + // ─── MINIO_CONFIG_EVENT ────────────────────────────────────────────────────── + // Forwarded by oc-discovery after receiving via libp2p ProtocolMinioConfigResource. + // Payload is a MinioCredentialEvent (phase discriminated by Access presence). + tools.MINIO_CONFIG_EVENT: func(resp tools.NATSResponse) { + minioEvent := minio.MinioCredentialEvent{} + if err := json.Unmarshal(resp.Payload, &minioEvent); err == nil { + if minioEvent.Access != "" { + // Phase 2: credentials present → this peer is the TARGET (compute). + minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget( + context.Background(), minioEvent) + } else { + err := CreateNamespace(minioEvent.ExecutionsID) + fmt.Println("NS", err) + // Phase 1: no credentials → this peer is the SOURCE (Minio host). + minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource( + context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID) + } + } + }, + + // ─── REMOVE_RESOURCE ──────────────────────────────────────────────────────── + // Routed by oc-discovery via ProtocolDeleteResource for datacenter teardown. + // Only STORAGE_RESOURCE and COMPUTE_RESOURCE deletions are handled here. + tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { + switch resp.Datatype { + case tools.STORAGE_RESOURCE: + deleteEvent := minio.MinioDeleteEvent{} + if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" { + go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). + TeardownAsSource(context.Background(), deleteEvent) + } + case tools.COMPUTE_RESOURCE: + argo := &ArgoKubeEvent{} + if err := json.Unmarshal(resp.Payload, argo); err == nil && argo.ExecutionsID != "" { go NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background()) } }