Missing Create Namespace

This commit is contained in:
mr
2026-03-18 16:43:44 +01:00
parent e4834db518
commit 6c0b07b49d
6 changed files with 134 additions and 58 deletions

View File

@@ -187,7 +187,7 @@ func (p *PrometheusService) Stream(ctx context.Context, bookingID string, interv
bk.ExecutionMetrics[kk] = append(bk.ExecutionMetrics[kk], vv...)
}
}
bk.GetAccessor(nil).UpdateOne(bk, bookingID)
bk.GetAccessor(nil).UpdateOne(bk.Serialize(bk), bookingID)
mets = map[string][]models.MetricsSnapshot{}
count = 0
}

View File

@@ -114,7 +114,7 @@ func (v *VectorService) ListenVector(ctx context.Context, b *booking.Booking, in
b.ExecutionMetrics[kk] = append(b.ExecutionMetrics[kk], vv...)
}
}
b.GetAccessor(nil).UpdateOne(b, b.GetID())
b.GetAccessor(nil).UpdateOne(b.Serialize(b), b.GetID())
mets = map[string][]models.MetricsSnapshot{}
count = 0
}

View File

@@ -0,0 +1,65 @@
package infrastructure
import (
"context"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
)
// ---------------------------------------------------------------------------
// Kubernetes namespace helper
// ---------------------------------------------------------------------------
func CreateNamespace(ns string) error {
/*
* This function is used to create a namespace.
* It takes the following parameters:
* - ns: the namespace you want to create
*/
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil {
return nil
}
c := context.Background()
ok, err := serv.GetNamespace(c, ns)
if ok != nil && err == nil {
logger.Debug().Msg("A namespace with name " + ns + " already exists")
return nil
}
if err != nil {
return err
}
err = serv.CreateNamespace(c, ns)
if err != nil {
return err
}
err = serv.CreateServiceAccount(c, ns)
if err != nil {
return err
}
role := "argo-role"
err = serv.CreateRole(c, ns, role,
[][]string{
{"coordination.k8s.io"},
{""},
{""}},
[][]string{
{"leases"},
{"secrets"},
{"pods"}},
[][]string{
{"get", "create", "update"},
{"get"},
{"patch"}})
if err != nil {
return err
}
return serv.CreateRoleBinding(c, ns, "argo-role-binding", role)
}

View File

@@ -3,6 +3,7 @@ package infrastructure
import (
"context"
"encoding/json"
"fmt"
"oc-datacenter/infrastructure/minio"
"sync"
@@ -41,9 +42,13 @@ func ListenNATS() {
}
if argo.Type == tools.STORAGE_RESOURCE {
fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT")
// ── Minio credential provisioning ──────────────────────────────
setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID)
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG MYSELF")
err := CreateNamespace(argo.ExecutionsID)
fmt.Println("NS", err)
// Same peer: source creates credentials and immediately stores them.
go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID)
} else {
@@ -61,6 +66,7 @@ func ListenNATS() {
Payload: b,
Action: tools.PB_MINIO_CONFIG,
}); err == nil {
fmt.Println("CONFIG THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
@@ -72,8 +78,12 @@ func ListenNATS() {
}
}
} else {
fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT")
// ── Admiralty kubeconfig provisioning (existing behaviour) ──────
if argo.SourcePeerID == argo.DestPeerID {
fmt.Println("CONFIG MYSELF")
err := CreateNamespace(argo.ExecutionsID)
fmt.Println("NS", err)
go NewAdmiraltySetter(argo.ExecutionsID).InitializeAsSource(
context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID)
} else if b, err := json.Marshal(argo); err == nil {
@@ -81,6 +91,7 @@ func ListenNATS() {
Payload: b,
Action: tools.PB_ADMIRALTY_CONFIG,
}); err == nil {
fmt.Println("CONFIG THEM")
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
@@ -93,63 +104,61 @@ func ListenNATS() {
}
},
// ─── PROPALGATION_EVENT ─────────────────────────────────────────────────────
// Routes messages forwarded by oc-discovery to the right handler.
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
if resp.FromApp != "oc-discovery" {
return
}
var prop tools.PropalgationMessage
if err := json.Unmarshal(resp.Payload, &prop); err != nil {
return
}
switch prop.Action {
// ─── ADMIRALTY_CONFIG_EVENT ─────────────────────────────────────────────────
// Forwarded by oc-discovery after receiving via libp2p ProtocolAdmiraltyConfigResource.
// Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence).
tools.ADMIRALTY_CONFIG_EVENT: func(resp tools.NATSResponse) {
// ── Admiralty ──────────────────────────────────────────────────────
case tools.PB_ADMIRALTY_CONFIG:
kubeconfigEvent := KubeconfigEvent{}
if err := json.Unmarshal(prop.Payload, &kubeconfigEvent); err == nil {
if kubeconfigEvent.Kubeconfig != "" {
// Phase 2: kubeconfig present → this peer is the TARGET (scheduler).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget(
context.Background(), kubeconfigEvent)
} else {
// Phase 1: no kubeconfig → this peer is the SOURCE (compute).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource(
context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID)
}
}
// ── Minio ──────────────────────────────────────────────────────────
case tools.PB_MINIO_CONFIG:
minioEvent := minio.MinioCredentialEvent{}
if err := json.Unmarshal(prop.Payload, &minioEvent); err == nil {
if minioEvent.Access != "" {
// Phase 2: credentials present → this peer is the TARGET (compute).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget(
context.Background(), minioEvent)
} else {
// Phase 1: no credentials → this peer is the SOURCE (Minio host).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource(
context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID)
}
}
// ── Deletion (routed by oc-discovery to the source peer) ───────────
case tools.PB_DELETE:
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(prop.Payload, argo); err != nil || argo.ExecutionsID == "" {
return
}
if argo.Type == tools.STORAGE_RESOURCE {
// Minio source teardown: revoke credentials + delete bucket.
deleteEvent := minio.MinioDeleteEvent{}
if err := json.Unmarshal(prop.Payload, &deleteEvent); err == nil {
go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID).
TeardownAsSource(context.Background(), deleteEvent)
}
kubeconfigEvent := KubeconfigEvent{}
if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil {
if kubeconfigEvent.Kubeconfig != "" {
// Phase 2: kubeconfig present → this peer is the TARGET (scheduler).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget(
context.Background(), kubeconfigEvent)
} else {
// Admiralty source teardown: delete AdmiraltySource + namespace.
err := CreateNamespace(kubeconfigEvent.ExecutionsID)
fmt.Println("NS", err)
// Phase 1: no kubeconfig → this peer is the SOURCE (compute).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource(
context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID)
}
}
},
// ─── MINIO_CONFIG_EVENT ──────────────────────────────────────────────────────
// Forwarded by oc-discovery after receiving via libp2p ProtocolMinioConfigResource.
// Payload is a MinioCredentialEvent (phase discriminated by Access presence).
tools.MINIO_CONFIG_EVENT: func(resp tools.NATSResponse) {
minioEvent := minio.MinioCredentialEvent{}
if err := json.Unmarshal(resp.Payload, &minioEvent); err == nil {
if minioEvent.Access != "" {
// Phase 2: credentials present → this peer is the TARGET (compute).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget(
context.Background(), minioEvent)
} else {
err := CreateNamespace(minioEvent.ExecutionsID)
fmt.Println("NS", err)
// Phase 1: no credentials → this peer is the SOURCE (Minio host).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource(
context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID)
}
}
},
// ─── REMOVE_RESOURCE ────────────────────────────────────────────────────────
// Routed by oc-discovery via ProtocolDeleteResource for datacenter teardown.
// Only STORAGE_RESOURCE and COMPUTE_RESOURCE deletions are handled here.
tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.STORAGE_RESOURCE:
deleteEvent := minio.MinioDeleteEvent{}
if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" {
go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID).
TeardownAsSource(context.Background(), deleteEvent)
}
case tools.COMPUTE_RESOURCE:
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(resp.Payload, argo); err == nil && argo.ExecutionsID != "" {
go NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background())
}
}