package infrastructure import ( "encoding/json" "errors" "fmt" "oc-scheduler/infrastructure/scheduling" "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/bill" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/robfig/cron" ) /* * WorkflowSchedule is a struct that contains the scheduling information of a workflow * It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression */ // it's a flying object only use in a session time. It's not stored in the database type WorkflowSchedule struct { UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule WorkflowExecution []*workflow_execution.WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow Message string `json:"message,omitempty"` // Message is the message of the schedule Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible SelectedInstances workflow.ConfigItem `json:"selected_instances"` SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` SelectedBuyings workflow.ConfigItem `json:"selected_buyings"` SelectedStrategies workflow.ConfigItem `json:"selected_strategies"` SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"` // Confirm, when true, triggers Schedule() to confirm the drafts held by this session. Confirm bool `json:"confirm,omitempty"` } func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule { ws := &WorkflowSchedule{ UUID: uuid.New().String(), Start: time.Now().UTC().Add(asapBuffer), BookingMode: booking.BookingMode(mode), DurationS: durationInS, Cron: cron, } s, err := time.ParseInLocation("2006-01-02T15:04:05", start, time.UTC) if err == nil && ws.BookingMode == booking.PLANNED { ws.Start = s // can apply a defined start other than now, if planned } e, err := time.ParseInLocation("2006-01-02T15:04:05", end, time.UTC) if err == nil { ws.End = &e } return ws } func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*workflow_execution.WorkflowExecution, []scheduling.SchedulerObject, []scheduling.SchedulerObject, error) { access := workflow.NewAccessor(request) res, code, err := access.LoadOne(wfID) if code != 200 { return false, nil, []*workflow_execution.WorkflowExecution{}, []scheduling.SchedulerObject{}, []scheduling.SchedulerObject{}, errors.New("could not load the workflow with id: " + err.Error()) } wf := res.(*workflow.Workflow) isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies, int(ws.BookingMode), request) if err != nil { return false, wf, []*workflow_execution.WorkflowExecution{}, []scheduling.SchedulerObject{}, []scheduling.SchedulerObject{}, err } ws.DurationS = longest ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" } execs, err := ws.GetExecutions(wf, isPreemptible) if err != nil { return false, wf, []*workflow_execution.WorkflowExecution{}, []scheduling.SchedulerObject{}, []scheduling.SchedulerObject{}, err } purchased := []scheduling.SchedulerObject{} bookings := []scheduling.SchedulerObject{} for _, exec := range execs { for _, obj := range exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds) { purchased = append(purchased, scheduling.ToSchedulerObject(tools.PURCHASE_RESOURCE, obj)) } for _, obj := range exec.Book(ws.UUID, wfID, priceds) { bookings = append(bookings, scheduling.ToSchedulerObject(tools.BOOKING, obj)) } } return true, wf, execs, purchased, bookings, nil } // GenerateOrder creates a draft order (+ draft bill) for the given purchases and bookings. // Returns the created order ID and any error. func (ws *WorkflowSchedule) GenerateOrder(purchases []scheduling.SchedulerObject, bookings []scheduling.SchedulerObject, executionsID string, request *tools.APIRequest) (string, error) { newOrder := &order.Order{ AbstractObject: utils.AbstractObject{ Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"), IsDraft: true, }, ExecutionsID: executionsID, Purchases: []*purchase_resource.PurchaseResource{}, Bookings: []*booking.Booking{}, Status: enum.PENDING, } for _, purch := range purchases { newOrder.Purchases = append( newOrder.Purchases, scheduling.FromSchedulerObject(tools.PURCHASE_RESOURCE, purch).(*purchase_resource.PurchaseResource)) } for _, b := range bookings { newOrder.Bookings = append( newOrder.Bookings, scheduling.FromSchedulerObject(tools.BOOKING, b).(*booking.Booking)) } res, _, err := order.NewAccessor(request).StoreOne(newOrder) if err != nil { return "", err } if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil { return res.GetID(), err } return res.GetID(), nil } func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) { if request == nil { return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no request found") } selfID, _ := oclib.GetMySelf() // If the client provides a scheduling_id from a Check session, confirm the // pre-created drafts (bookings/purchases). Executions already exist as drafts // and will be confirmed later by the considers mechanism. if ws.UUID != "" { adminReq := &tools.APIRequest{Admin: true} // Obsolescence check: abort if any session execution's start date has passed. executions := loadSessionExecs(ws.UUID) for _, exec := range executions { if !exec.ExecDate.IsZero() && exec.ExecDate.Before(time.Now().UTC()) { return ws, nil, nil, fmt.Errorf("execution %s is obsolete (start date in the past)", exec.GetID()) } } if err := ConfirmSession(ws.UUID, selfID, request); err != nil { return ws, nil, []*workflow_execution.WorkflowExecution{}, fmt.Errorf("confirm session failed: %w", err) } for _, exec := range executions { go WatchExecDeadline(exec.GetID(), exec.ExecutionsID, exec.ExecDate, selfID, request) } obj, _, _ := workflow.NewAccessor(request).LoadOne(wfID) if obj == nil { return ws, nil, executions, nil } wf := obj.(*workflow.Workflow) ws.Workflow = wf ws.WorkflowExecution = executions wf.GetAccessor(adminReq).UpdateOne(wf.Serialize(wf), wf.GetID()) return ws, wf, executions, nil } // Schedule must be called from a Check session (ws.UUID set above). // Direct scheduling without a prior Check session is not supported. return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no scheduling session: use the Check stream first") } // propagateResource routes a purchase or booking to its destination: // - If destPeerID matches our own peer (selfMongoID), the object is stored // directly in the local DB as draft and the local planner is refreshed. // - Otherwise a NATS CREATE_RESOURCE message is emitted so the destination // peer can process it asynchronously. // // The caller is responsible for setting obj.IsDraft before calling. func propagateResource(obj utils.DBObject, destPeerID string, dt tools.DataType, selfMongoID *peer.Peer, request *tools.APIRequest, errCh chan error) { if destPeerID == selfMongoID.GetID() { stored := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).StoreOne(obj.Serialize(obj)) if stored.Err != "" || stored.Data == nil { errCh <- fmt.Errorf("could not store %s locally: %s", dt.String(), stored.Err) return } // The planner tracks booking time-slots only; purchases do not affect it. if dt == tools.BOOKING { go refreshSelfPlanner(selfMongoID.PeerID, request) } errCh <- nil return } m := obj.Serialize(obj) if m["dest_peer_id"] != nil { if data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(fmt.Sprintf("%v", m["dest_peer_id"])); data.Data != nil { m["peer_id"] = data.Data.(*peer.Peer).PeerID } } else { fmt.Println("NO DEST ID") return } payload, err := json.Marshal(m) if err != nil { errCh <- fmt.Errorf("could not serialize %s: %w", dt.String(), err) return } if b, err := json.Marshal(&tools.PropalgationMessage{ DataType: dt.EnumIndex(), Action: tools.PB_CREATE, Payload: payload, }); err == nil { tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: dt, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } errCh <- nil } /* * getExecutions is a function that returns the executions of a workflow * it returns an array of workflow_execution.WorkflowExecution */ func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*workflow_execution.WorkflowExecution, error) { workflows_executions := []*workflow_execution.WorkflowExecution{} dates, err := ws.GetDates() if err != nil { return workflows_executions, err } for _, date := range dates { obj := &workflow_execution.WorkflowExecution{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), // set the uuid of the execution Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution }, Priority: 1, ExecutionsID: ws.UUID, ExecDate: date.Start, // set the execution date EndDate: date.End, // set the end date State: enum.DRAFT, // set the state to 1 (scheduled) WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution } if ws.BookingMode != booking.PLANNED { obj.Priority = 0 } if ws.BookingMode == booking.PREEMPTED && isPreemptible { obj.Priority = 7 } ws.SelectedStrategies = obj.SelectedStrategies ws.SelectedPartnerships = obj.SelectedPartnerships ws.SelectedBuyings = obj.SelectedBuyings ws.SelectedInstances = obj.SelectedInstances workflows_executions = append(workflows_executions, obj) } return workflows_executions, nil } func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) { schedule := []Schedule{} if len(ws.Cron) > 0 { // if cron is set then end date should be set if ws.End == nil { return schedule, errors.New("a cron task should have an end date") } if ws.DurationS <= 0 { ws.DurationS = ws.End.Sub(ws.Start).Seconds() } cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields) return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw") } subCron := strings.Join(cronStr[:6], " ") // cron should be parsed as ss mm hh dd MM dw t (min 6 fields) specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser sched, err := specParser.Parse(subCron) // parse the cron string if err != nil { return schedule, errors.New("Bad cron message: " + err.Error()) } // loop through the cron schedule to set the executions for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { e := s.Add(time.Duration(ws.DurationS) * time.Second) schedule = append(schedule, Schedule{ Start: s, End: &e, }) } } else { // if no cron, set the execution to the start date schedule = append(schedule, Schedule{ Start: ws.Start, End: ws.End, }) } return schedule, nil } type Schedule struct { Start time.Time End *time.Time } /* * TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET * SET PROTECTION BORDER TIME */