Files
oc-lib/tools/nats_caller.go
mr a62fbc6c7a Workflow lifecycle events + resource instance duration tracking
- Add WorkflowLifecycleEvent + StepMetric to tools/workflow_lifecycle.go
- Add WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT NATS methods
- ResourceInstance.UpdateAverageDuration for AverageDurationS running average
- Support Steps recap in WORKFLOW_DONE_EVENT for catch-up by oc-scheduler/oc-catalog

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 10:30:30 +01:00

171 lines
4.8 KiB
Go

package tools
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/logs"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
)
type NATSResponse struct {
FromApp string `json:"from_app"`
Datatype DataType `json:"datatype"`
User string `json:"user"`
Groups []string `json:"groups"`
Method int `json:"method"`
SearchAttr string `json:"search_attr"`
Payload []byte `json:"payload"`
}
// NATS Method Enum defines the different methods that can be used to interact with the NATS server
type NATSMethod int
var meths = []string{"remove execution", "create execution", "planner execution", "discovery",
"workflow event", "argo kube event", "create resource", "remove resource",
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event",
"workflow started event", "workflow step done event", "workflow done event",
}
const (
REMOVE_EXECUTION NATSMethod = iota
CREATE_EXECUTION
PLANNER_EXECUTION
DISCOVERY
WORKFLOW_EVENT
ARGO_KUBE_EVENT
CREATE_RESOURCE
REMOVE_RESOURCE
PROPALGATION_EVENT
SEARCH_EVENT
CONFIRM_EVENT
CONSIDERS_EVENT
ADMIRALTY_CONFIG_EVENT
MINIO_CONFIG_EVENT
// Workflow lifecycle events emitted by oc-monitord.
// oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state.
// oc-datacenter listens to STEP_DONE and DONE to close bookings and tear down infra.
WORKFLOW_STARTED_EVENT
WORKFLOW_STEP_DONE_EVENT
WORKFLOW_DONE_EVENT
)
func (n NATSMethod) String() string {
return meths[n]
}
// NameToMethod returns the NATSMethod enum value from a string
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}
}
return -1
}
// GenerateKey generates a key for the NATSMethod usefull for standard key based on data name & method
func (d NATSMethod) GenerateKey() string {
return strings.ReplaceAll(d.String(), " ", "_")
}
type natsCaller struct{}
// NewNATSCaller creates a new instance of the NATS Caller
func NewNATSCaller() *natsCaller {
return &natsCaller{}
}
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (s *natsCaller) ListenNats(execs map[NATSMethod]func(NATSResponse)) {
log := logs.GetLogger()
if config.GetConfig().NATSUrl == "" {
log.Error().Msg(" -> NATS_SERVER is not set")
return
}
for {
nc, err := nats.Connect(config.GetConfig().NATSUrl)
if err != nil {
log.Error().Msg("Could not connect to NATS")
time.Sleep(1 * time.Minute)
continue
}
defer nc.Close()
var wg sync.WaitGroup
wg.Add(len(execs))
for k, v := range execs {
go s.listenForChange(log, nc, k, v, &wg)
}
wg.Wait()
break
}
}
// SetNATSPub sets a message to the NATS server
func (o *natsCaller) SetNATSPub(method NATSMethod, data NATSResponse) string {
if config.GetConfig().NATSUrl == "" {
return " -> NATS_SERVER is not set"
}
for {
nc, err := nats.Connect(config.GetConfig().NATSUrl)
if err != nil {
fmt.Println("NATS Connect err", err)
time.Sleep(1 * time.Minute)
continue
}
defer nc.Close()
js, err := json.Marshal(data)
if err != nil {
fmt.Println("NATS Marshal err", err)
return " -> " + err.Error()
}
err = nc.Publish(method.GenerateKey(), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method
if err != nil {
fmt.Println("Publish Failed", err)
time.Sleep(1 * time.Minute)
continue
}
fmt.Println("Published on", method.GenerateKey())
break
}
return ""
}
// Goroutine listening to a NATS server for updates
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsTools NATSMethod,
function func(NATSResponse), wg *sync.WaitGroup) {
defer wg.Done()
ch := make(chan *nats.Msg, 64)
logger.Info().Msg("Listening to " + natsTools.GenerateKey())
subs, err := nc.ChanSubscribe(natsTools.GenerateKey(), ch)
if err != nil {
logger.Error().Msg("Error listening to NATS : " + err.Error())
}
defer subs.Unsubscribe()
for msg := range ch {
var resp NATSResponse
json.Unmarshal(msg.Data, &resp)
logger.Info().Msg("Catching " + natsTools.String() + "... " + resp.FromApp + " - " + resp.Datatype.String())
function(resp)
}
}