From 3059e7fcbce18b421fb13fb35da7c3f96d90926f Mon Sep 17 00:00:00 2001 From: mr Date: Mon, 2 Feb 2026 14:30:01 +0100 Subject: [PATCH] Update Oclib for event generation --- go.mod | 2 +- go.sum | 2 + models/template.go | 8 +-- workflow_builder/admiralty_setter.go | 89 ++++++++++++++-------------- workflow_builder/argo_builder.go | 39 ++++++------ 5 files changed, 71 insertions(+), 69 deletions(-) diff --git a/go.mod b/go.mod index ab7db4e..d1a78cf 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 toolchain go1.23.3 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d + cloud.o-forge.io/core/oc-lib v0.0.0-20260129122033-186ba3e689c7 github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 diff --git a/go.sum b/go.sum index 788f750..988a8d5 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51 h1:jlSEprNaUBe62 cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE= cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260129122033-186ba3e689c7 h1:NRFGRqN+j5g3DrtXMYN5T5XSYICG+OU2DisjBdID3j8= +cloud.o-forge.io/core/oc-lib v0.0.0-20260129122033-186ba3e689c7/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= diff --git a/models/template.go b/models/template.go index 19b4d9e..eeac2ea 100644 --- a/models/template.go +++ b/models/template.go @@ -106,19 +106,19 @@ func (template *Template) CreateEventContainer(exec *workflow_execution.Workflow container := Container{Image: "natsio/nats-box"} container.Command = []string{"sh", "-c"} // all is bash - var event *native_tools.WorkflowEventParams + var event native_tools.WorkflowEventParams b, err := json.Marshal(nt.Params) if err != nil { fmt.Println(err) return } - err = json.Unmarshal(b, event) + err = json.Unmarshal(b, &event) if err != nil { fmt.Println(err) return } - if event != nil { - container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey(tools.WORKFLOW_EVENT.String())+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'") + if event.WorkflowResourceID != "" { + container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey()+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'") container.Args = []string{strings.Join(container.Args, " ")} template.Container = container } diff --git a/workflow_builder/admiralty_setter.go b/workflow_builder/admiralty_setter.go index edc897c..4862df0 100644 --- a/workflow_builder/admiralty_setter.go +++ b/workflow_builder/admiralty_setter.go @@ -14,61 +14,60 @@ import ( tools "cloud.o-forge.io/core/oc-lib/tools" ) - type AdmiraltySetter struct { - Id string // ID to identify the execution, correspond to workflow_executions id - NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} + Id string // ID to identify the execution, correspond to workflow_executions id + NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} } -func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { - +func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string, remotePeerID string) error { + logger := logs.GetLogger() - data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) + data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(remotePeerID) if data.Code != 200 { logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID) return fmt.Errorf(data.Err) } remotePeer := data.ToPeer() - data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) - if data.Code != 200 { - logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) - return fmt.Errorf(data.Err) - } - localPeer := data.ToPeer() + data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(localPeerID) + if data.Code != 200 { + logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) + return fmt.Errorf(data.Err) + } + localPeer := data.ToPeer() - caller := tools.NewHTTPCaller( - map[tools.DataType]map[tools.METHOD]string{ - tools.ADMIRALTY_SOURCE: { - tools.POST :"/:id", - }, - tools.ADMIRALTY_KUBECONFIG: { - tools.GET:"/:id", - }, - tools.ADMIRALTY_SECRET: { - tools.POST:"/:id/" + remotePeerID, - }, - tools.ADMIRALTY_TARGET: { - tools.POST:"/:id/" + remotePeerID, - }, - tools.ADMIRALTY_NODES: { - tools.GET:"/:id/" + remotePeerID, - }, + caller := tools.NewHTTPCaller( + map[tools.DataType]map[tools.METHOD]string{ + tools.ADMIRALTY_SOURCE: { + tools.POST: "/:id", }, - ) - + tools.ADMIRALTY_KUBECONFIG: { + tools.GET: "/:id", + }, + tools.ADMIRALTY_SECRET: { + tools.POST: "/:id/" + remotePeerID, + }, + tools.ADMIRALTY_TARGET: { + tools.POST: "/:id/" + remotePeerID, + }, + tools.ADMIRALTY_NODES: { + tools.GET: "/:id/" + remotePeerID, + }, + }, + ) + logger.Info().Msg("\n\n Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) - s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) + s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) logger.Info().Msg("\n\n Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) kubeconfig := s.getKubeconfig(remotePeer, caller) logger.Info().Msg("\n\n Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) - s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) - logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id ) - s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) + s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller, s.Id, tools.ADMIRALTY_SECRET, tools.POST, kubeconfig, true) + logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id) + s.callRemoteExecution(localPeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_TARGET, tools.POST, nil, true) logger.Info().Msg("\n\n Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) - s.checkNodeStatus(localPeer,caller) - + s.checkNodeStatus(localPeer, caller) + return nil } @@ -90,11 +89,11 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle return kubedata } -func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) { +func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) { l := utils.GetLogger() _, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) if err != nil { - l.Error().Msg("Error when executing on peer at" + peer.Url) + l.Error().Msg("Error when executing on peer at" + peer.APIUrl) l.Error().Msg(err.Error()) panic(0) } @@ -111,7 +110,7 @@ func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, } -func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ +func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) { var data map[string]interface{} if resp, ok := caller.LastResults["body"]; ok { json.Unmarshal(resp.([]byte), &data) @@ -128,10 +127,10 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ } } -func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ - for i := range(5) { +func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller) { + for i := range 5 { time.Sleep(10 * time.Second) // let some time for kube to generate the node - s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) + s.callRemoteExecution(localPeer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_NODES, tools.GET, nil, false) if caller.LastResults["code"] == 200 { s.storeNodeName(caller) return @@ -142,5 +141,5 @@ func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HT } logger.Info().Msg("Could not verify that node is up. Retrying...") } - -} \ No newline at end of file + +} diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 27fbb3e..1725254 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -17,6 +17,7 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/native_tools" w "cloud.o-forge.io/core/oc-lib/models/workflow" @@ -194,30 +195,30 @@ func (b *ArgoBuilder) createArgoTemplates( exec *workflow_execution.WorkflowExecution, namespace string, id string, - processing resources.ResourceInterface, + obj resources.ResourceInterface, volumes []VolumeMount, firstItems []string, lastItems []string) ([]VolumeMount, []string, []string) { - _, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, processing, firstItems, lastItems) - template := &Template{Name: getArgoName(processing.GetName(), id)} + _, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, obj, firstItems, lastItems) + template := &Template{Name: getArgoName(obj.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) - isReparted, peerId := b.isProcessingReparted(processing, id) - if processing.GetType() == tools.PROCESSING_RESOURCE.String() { - template.CreateContainer(exec, processing.(*resources.ProcessingResource), b.Workflow.getDag()) - } else if processing.GetType() == tools.NATIVE_TOOL.String() { - template.CreateEventContainer(exec, processing.(*resources.NativeTool), b.Workflow.getDag()) + isReparted, peer := b.isReparted(obj, id) + if obj.GetType() == tools.PROCESSING_RESOURCE.String() { + template.CreateContainer(exec, obj.(*resources.ProcessingResource), b.Workflow.getDag()) + } else if obj.GetType() == tools.NATIVE_TOOL.String() { + template.CreateEventContainer(exec, obj.(*resources.NativeTool), b.Workflow.getDag()) } if isReparted { - logger.Debug().Msg("Reparted processing, on " + peerId) - b.RemotePeers = append(b.RemotePeers, peerId) - template.AddAdmiraltyAnnotations(peerId) + logger.Debug().Msg("Reparted processing, on " + peer.GetID()) + b.RemotePeers = append(b.RemotePeers, peer.GetID()) + template.AddAdmiraltyAnnotations(peer.GetID()) } // get datacenter from the processing - if processing.GetType() == tools.PROCESSING_RESOURCE.String() && processing.(*resources.ProcessingResource).IsService { - b.CreateService(exec, id, processing) + if obj.GetType() == tools.PROCESSING_RESOURCE.String() && obj.(*resources.ProcessingResource).IsService { + b.CreateService(exec, id, obj) template.Metadata.Labels = make(map[string]string) - template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing + template.Metadata.Labels["app"] = "oc-service-" + obj.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing } volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes) @@ -470,7 +471,7 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { // Verify if a processing resource is attached to another Compute than the one hosting // the current Open Cloud instance. If true return the peer ID to contact -func (b *ArgoBuilder) isProcessingReparted(processing resources.ResourceInterface, graphID string) (bool, string) { +func (b *ArgoBuilder) isReparted(processing resources.ResourceInterface, graphID string) (bool, *peer.Peer) { computeAttached := b.retrieveProcessingCompute(graphID) if computeAttached == nil { logger.Error().Msg("No compute was found attached to processing " + processing.GetName() + " : " + processing.GetID()) @@ -481,22 +482,22 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ResourceInterfac req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) if req == nil { fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") - return false, "" + return false, nil } res := req.LoadOne(computeAttached.CreatorID) if res.Err != "" { fmt.Print("TODO : handle error when requesting PeerID") fmt.Print(res.Err) - return false, "" + return false, nil } - peer := *res.ToPeer() + peer := res.ToPeer() isNotReparted := peer.State == 1 logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) - return !isNotReparted, peer.UUID + return !isNotReparted, peer } func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource {