package daemons import ( "encoding/json" "fmt" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson/primitive" ) type ScheduledExecution struct { Execs map[string]workflow_execution.WorkflowExecution Mu sync.Mutex } func (sb *ScheduledExecution) DeleteSchedules(resp tools.NATSResponse) { var m map[string]string json.Unmarshal(resp.Payload, m) Executions.Mu.Lock() defer Executions.Mu.Unlock() delete(sb.Execs, m["id"]) } func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { Executions.Mu.Lock() defer Executions.Mu.Unlock() for _, exec := range new_executions { fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec)) if !sb.execIsSet(exec) { sb.Execs[exec.UUID] = *exec } } } func (sb *ScheduledExecution) execIsSet(exec *workflow_execution.WorkflowExecution) bool { if _, ok := sb.Execs[exec.UUID]; ok { return true } return false } // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Logger zerolog.Logger } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling() { var sleep_time float64 = 20 for { s.GetNextScheduledWorkflows(tools.NATSResponse{}) s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Executions.Execs))) time.Sleep(time.Second * time.Duration(sleep_time)) } } func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) { fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String()) f := dbs.Filters{ And: map[string][]dbs.Filter{ "execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, "state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}}, }, } res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false) if res.Code != 200 { s.Logger.Error().Msg("Error loading " + res.Err) return } for _, exec := range res.Data { exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution)) } fmt.Println("Found "+fmt.Sprintf("%v", len(exec_list))+" workflows", res) return } func (s *ScheduleManager) ExecuteWorkflow(resp tools.NATSResponse) { var m map[string]string json.Unmarshal(resp.Payload, &m) res := resources.WorkflowResource{} access := res.GetAccessor(&tools.APIRequest{}) if d, code, err := access.LoadOne(m["workflow_id"]); code == 200 && err == nil { eventExec := &workflow_execution.WorkflowExecution{ WorkflowID: d.GetID(), ExecDate: time.Now(), ExecutionsID: uuid.New().String(), State: enum.SCHEDULED, } Executions.AddSchedules([]*workflow_execution.WorkflowExecution{eventExec}, s.Logger) } } func (s *ScheduleManager) GetNextScheduledWorkflows(_ tools.NATSResponse) { start := time.Now().UTC() fmt.Println(s.getExecution( start.Add(time.Second*time.Duration(-1)).UTC(), start.Add(time.Minute*time.Duration(1)).UTC(), )) if next_wf_exec, err := s.getExecution( start.Add(time.Second*time.Duration(-1)).UTC(), start.Add(time.Minute*time.Duration(1)).UTC(), ); err != nil { s.Logger.Error().Msg("Could not retrieve next schedules") } else { Executions.AddSchedules(next_wf_exec, s.Logger) } }