package admiralty import ( "context" "encoding/base64" "encoding/json" "errors" "fmt" "strings" "sync" "time" "oc-datacenter/conf" "oc-datacenter/infrastructure/kubernetes/models" "oc-datacenter/infrastructure/monitor" "oc-datacenter/infrastructure/storage" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) // kubeconfigChannels holds channels waiting for kubeconfig delivery (keyed by executionID). var kubeconfigChannels sync.Map // admiraltyConsidersPayload is the PB_CONSIDERS payload emitted after admiralty provisioning. type admiraltyConsidersPayload struct { OriginID string `json:"origin_id"` ExecutionsID string `json:"executions_id"` // PeerID is the compute peer (SourcePeerID of the original ArgoKubeEvent). // oc-monitord uses it to build a unique considers key per peer, avoiding // broadcast collisions when multiple compute peers run in parallel. PeerID string `json:"peer_id,omitempty"` Secret string `json:"secret,omitempty"` Error *string `json:"error,omitempty"` } // emitAdmiraltyConsiders publishes a PB_CONSIDERS back to OriginID with the result // of the admiralty provisioning. secret is the base64-encoded kubeconfig; err is nil on success. // When self is true the origin is the local peer: emits directly on CONSIDERS_EVENT // instead of routing through PROPALGATION_EVENT. func emitAdmiraltyConsiders(executionsID, originID, peerID, secret string, provErr error, self bool) { var errStr *string if provErr != nil { s := provErr.Error() errStr = &s } payload, _ := json.Marshal(admiraltyConsidersPayload{ OriginID: originID, ExecutionsID: executionsID, PeerID: peerID, Secret: secret, Error: errStr, }) if self { go tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: tools.COMPUTE_RESOURCE, Method: int(tools.CONSIDERS_EVENT), Payload: payload, }) return } b, _ := json.Marshal(&tools.PropalgationMessage{ DataType: tools.COMPUTE_RESOURCE.EnumIndex(), Action: tools.PB_CONSIDERS, Payload: payload, }) go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: -1, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } // AdmiraltySetter carries the execution context for an admiralty pairing. type AdmiraltySetter struct { ExecutionsID string // execution ID, used as the Kubernetes namespace NodeName string // name of the virtual node created by Admiralty on the target cluster } func NewAdmiraltySetter(execIDS string) *AdmiraltySetter { return &AdmiraltySetter{ ExecutionsID: execIDS, } } // InitializeAsSource is called on the peer that acts as the SOURCE cluster (compute provider). // It creates the AdmiraltySource resource, generates a kubeconfig for the target peer, // and publishes it on NATS so the target peer can complete its side of the setup. func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string, self bool, images []string) error { logger := oclib.GetLogger() // Local execution: no Admiralty resources needed — just emit PB_CONSIDERS. if localPeerID == destPeerID { emitAdmiraltyConsiders(s.ExecutionsID, originID, localPeerID, "", nil, true) return nil } serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { return errors.New("InitializeAsSource: failed to create service: " + err.Error()) } // Create the AdmiraltySource resource on this cluster (inlined from CreateAdmiraltySource controller) logger.Info().Msg("Creating AdmiraltySource ns-" + s.ExecutionsID) _, err = serv.CreateAdmiraltySource(ctx, s.ExecutionsID) if err != nil && !strings.Contains(err.Error(), "already exists") { return errors.New("InitializeAsSource: failed to create service: " + err.Error()) } // Generate a service-account token for the namespace (inlined from GetAdmiraltyKubeconfig controller) token, err := serv.GenerateToken(ctx, s.ExecutionsID, 3600) if err != nil { return errors.New("InitializeAsSource: failed to generate token for ns-" + s.ExecutionsID + ": " + err.Error()) } kubeconfig, err := buildHostKubeWithToken(token) if err != nil { return errors.New("InitializeAsSource: " + err.Error()) } b, err := json.Marshal(kubeconfig) if err != nil { return errors.New("InitializeAsSource: failed to marshal kubeconfig: " + err.Error()) } encodedKubeconfig := base64.StdEncoding.EncodeToString(b) kube := models.KubeconfigEvent{ ExecutionsID: s.ExecutionsID, Kubeconfig: encodedKubeconfig, SourcePeerID: localPeerID, DestPeerID: destPeerID, OriginID: originID, SourceExecutionsID: s.ExecutionsID, Images: images, } // Publish the kubeconfig on NATS so the target peer can proceed payload, err := json.Marshal(kube) if err != nil { return errors.New("InitializeAsSource: failed to marshal kubeconfig event: " + err.Error()) } if b, err := json.Marshal(&tools.PropalgationMessage{ DataType: -1, Action: tools.PB_ADMIRALTY_CONFIG, Payload: payload, }); err == nil { go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: tools.COMPUTE_RESOURCE, User: "", Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } logger.Info().Msg("InitializeAsSource: kubeconfig published for ns-" + s.ExecutionsID) return nil } // InitializeAsTarget is called on the peer that acts as the TARGET cluster (scheduler). // It waits for the kubeconfig published by the source peer via NATS, then creates // the Secret, AdmiraltyTarget, and polls until the virtual node appears. // self must be true when the origin peer is the local peer (direct CONSIDERS_EVENT emission). func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj models.KubeconfigEvent, self bool) { logger := oclib.GetLogger() defer kubeconfigChannels.Delete(s.ExecutionsID) logger.Info().Msg("InitializeAsTarget: waiting for kubeconfig from source peer ns-" + s.ExecutionsID) kubeconfigData := kubeconfigObj.Kubeconfig serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { logger.Error().Msg("InitializeAsTarget: failed to create service: " + err.Error()) return } // 1. Create the namespace logger.Info().Msg("InitializeAsTarget: creating Namespace " + s.ExecutionsID) if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create namespace: " + err.Error()) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } // 2. Create the ServiceAccount sa-{executionID} logger.Info().Msg("InitializeAsTarget: creating ServiceAccount sa-" + s.ExecutionsID) if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create service account: " + err.Error()) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } // 3. Create the Role roleName := "role-" + s.ExecutionsID logger.Info().Msg("InitializeAsTarget: creating Role " + roleName) if err := serv.CreateRole(ctx, s.ExecutionsID, roleName, [][]string{ {"coordination.k8s.io"}, {""}, {""}}, [][]string{ {"leases"}, {"secrets"}, {"pods"}}, [][]string{ {"get", "create", "update"}, {"get"}, {"patch"}}, ); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create role: " + err.Error()) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } // 4. Create the RoleBinding rbName := "rb-" + s.ExecutionsID logger.Info().Msg("InitializeAsTarget: creating RoleBinding " + rbName) if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !strings.Contains(err.Error(), "already exists") { logger.Error().Msg("InitializeAsTarget: failed to create role binding: " + err.Error()) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } // Create the Secret from the source peer's kubeconfig (inlined from CreateKubeSecret controller) logger.Info().Msg("InitializeAsTarget: creating Secret ns-" + s.ExecutionsID) if _, err := serv.CreateKubeconfigSecret(ctx, kubeconfigData, s.ExecutionsID, kubeconfigObj.SourcePeerID); err != nil { logger.Error().Msg("InitializeAsTarget: failed to create kubeconfig secret: " + err.Error()) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } // Create the AdmiraltyTarget resource (inlined from CreateAdmiraltyTarget controller) logger.Info().Msg("InitializeAsTarget: creating AdmiraltyTarget ns-" + s.ExecutionsID) resp, err := serv.CreateAdmiraltyTarget(ctx, s.ExecutionsID, kubeconfigObj.SourcePeerID) if err != nil || resp == nil { logger.Error().Msg(fmt.Sprintf("InitializeAsTarget: failed to create admiralty target: %v", err)) if err == nil { err = fmt.Errorf("CreateAdmiraltyTarget returned nil response") } emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, "", err, self) return } // 5. Provision PVCs in the target namespace so Admiralty shadow pods can mount them. // The claim names must match what oc-monitord generates: {storageName}-{sourceExecutionsID}. if kubeconfigObj.SourceExecutionsID != "" { logger.Info().Msg("InitializeAsTarget: provisioning PVCs for source exec " + kubeconfigObj.SourceExecutionsID) provisionPVCsForTarget(ctx, s.ExecutionsID, kubeconfigObj.SourceExecutionsID, kubeconfigObj.SourcePeerID) } // Poll until the virtual node appears (inlined from GetNodeReady controller) logger.Info().Msg("InitializeAsTarget: waiting for virtual node ns-" + s.ExecutionsID) s.waitForNode(ctx, serv, kubeconfigObj.SourcePeerID) emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigObj.SourcePeerID, kubeconfigData, nil, self) } // provisionPVCsForTarget creates PVCs in the Admiralty target namespace for all local // storages booked under sourceExecutionsID. The claim names use sourceExecutionsID as // suffix so they match what oc-monitord generates in the workflow spec. func provisionPVCsForTarget(ctx context.Context, targetNS string, sourceExecutionsID string, peerID string) { logger := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "executions_id": {{Operator: dbs.EQUAL.String(), Value: sourceExecutionsID}}, "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, }, }, "", false) if res.Err != "" || len(res.Data) == 0 { return } for _, dbo := range res.Data { b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } storageName := storage.ResolveStorageName(b.ResourceID, peerID) if storageName == "" { continue } event := storage.PVCProvisionEvent{ ExecutionsID: targetNS, StorageID: b.ResourceID, StorageName: storageName, SourcePeerID: peerID, DestPeerID: peerID, OriginID: peerID, } // Use sourceExecutionsID as claim name suffix so it matches oc-monitord's claimName. setter := storage.NewPVCSetterWithClaimSuffix(b.ResourceID, sourceExecutionsID) logger.Info().Msgf("InitializeAsTarget: provisioning PVC %s in ns %s", storage.ClaimName(storageName, sourceExecutionsID), targetNS) setter.InitializeAsSource(ctx, event, true) } } // waitForNode polls GetOneNode until the Admiralty virtual node appears on this cluster. func (s *AdmiraltySetter) waitForNode(ctx context.Context, serv *tools.KubernetesService, sourcePeerID string) { logger := oclib.GetLogger() for i := range 5 { time.Sleep(10 * time.Second) node, err := serv.GetOneNode(ctx, s.ExecutionsID, sourcePeerID) if err == nil && node != nil { s.NodeName = node.Name logger.Info().Msg("waitForNode: node ready: " + s.NodeName) return } if i == 4 { logger.Error().Msg("waitForNode: node never appeared for ns-" + s.ExecutionsID) return } logger.Info().Msg("waitForNode: node not ready yet, retrying...") } } // TeardownAsTarget destroys all Admiralty resources created by InitializeAsTarget on the // target (scheduler) cluster: the AdmiraltyTarget CRD, the ServiceAccount, the Role, // the RoleBinding, and the namespace (namespace deletion cascades the rest). func (s *AdmiraltySetter) TeardownAsTarget(ctx context.Context, originID string) { logger := oclib.GetLogger() serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { logger.Error().Msg("TeardownAsTarget: failed to create k8s service: " + err.Error()) return } if err := serv.DeleteNamespace(ctx, s.ExecutionsID, func() { logger.Info().Msg("TeardownAsTarget: namespace " + s.ExecutionsID + " deleted") defer monitor.StreamRegistry.Register(s.ExecutionsID) }); err != nil { logger.Error().Msg("TeardownAsTarget: " + err.Error()) return } } // TeardownAsSource destroys all Admiralty resources created by InitializeAsSource on the // source (compute) cluster: the AdmiraltySource CRD, the ServiceAccount, and the namespace. // The namespace deletion cascades the Role and RoleBinding. func (s *AdmiraltySetter) TeardownAsSource(ctx context.Context) { logger := oclib.GetLogger() host := conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort ca := conf.GetConfig().KubeCA cert := conf.GetConfig().KubeCert data := conf.GetConfig().KubeData // Delete the AdmiraltySource CRD via dynamic client gvrSources := schema.GroupVersionResource{ Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "sources", } if dyn, err := tools.NewDynamicClient(host, ca, cert, data); err != nil { logger.Error().Msg("TeardownAsSource: failed to create dynamic client: " + err.Error()) } else if err := dyn.Resource(gvrSources).Namespace(s.ExecutionsID).Delete( ctx, "source-"+s.ExecutionsID, metav1.DeleteOptions{}, ); err != nil { logger.Error().Msg("TeardownAsSource: failed to delete AdmiraltySource: " + err.Error()) } // Delete the namespace (cascades SA, Role, RoleBinding) serv, err := tools.NewKubernetesService(host, ca, cert, data) if err != nil { logger.Error().Msg("TeardownAsSource: failed to create k8s service: " + err.Error()) return } if err := serv.Set.CoreV1().Namespaces().Delete(ctx, s.ExecutionsID, metav1.DeleteOptions{}); err != nil { logger.Error().Msg("TeardownAsSource: failed to delete namespace: " + err.Error()) return } logger.Info().Msg("TeardownAsSource: namespace " + s.ExecutionsID + " deleted") } // buildHostKubeWithToken builds a kubeconfig pointing to this peer's cluster, // authenticated with the provided service-account token. func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) { if len(token) == 0 { return nil, fmt.Errorf("buildHostKubeWithToken: empty token") } apiHost := conf.GetConfig().KubeExternalHost if apiHost == "" { apiHost = conf.GetConfig().KubeHost } encodedCA := conf.GetConfig().KubeCA return &models.KubeConfigValue{ APIVersion: "v1", CurrentContext: "default", Kind: "Config", Preferences: struct{}{}, Clusters: []models.KubeconfigNamedCluster{{ Name: "default", Cluster: models.KubeconfigCluster{ Server: "https://" + apiHost + ":6443", CertificateAuthorityData: encodedCA, }, }}, Contexts: []models.KubeconfigNamedContext{{ Name: "default", Context: models.KubeconfigContext{Cluster: "default", User: "default"}, }}, Users: []models.KubeconfigUser{{ Name: "default", User: models.KubeconfigUserKeyPair{Token: token}, }}, }, nil } // teardownAdmiraltyIfRemote triggers Admiralty TeardownAsTarget only when at // least one compute booking for the execution is on a remote peer. // Local executions do not involve Admiralty. func (s *AdmiraltySetter) TeardownIfRemote(exec *workflow_execution.WorkflowExecution, selfPeerID string) { logger := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", selfPeerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}}, "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}}, }, }, "", false) if res.Err != "" || len(res.Data) == 0 { return } for _, dbo := range res.Data { b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } if b.DestPeerID != selfPeerID { logger.Info().Msgf("InfraTeardown: Admiralty teardown exec=%s (remote peer=%s)", exec.ExecutionsID, b.DestPeerID) s.TeardownAsTarget(context.Background(), selfPeerID) return // one teardown per execution is enough } } }