Files
oc-scheduler/infrastructure/scheduler.go

321 lines
14 KiB
Go
Raw Permalink Normal View History

2026-01-14 15:15:26 +01:00
package infrastructure
import (
2026-02-23 18:10:47 +01:00
"encoding/json"
2026-01-14 15:15:26 +01:00
"errors"
"fmt"
2026-03-17 11:58:27 +01:00
"oc-scheduler/infrastructure/scheduling"
2026-01-14 15:15:26 +01:00
"strings"
"time"
2026-02-23 18:10:47 +01:00
oclib "cloud.o-forge.io/core/oc-lib"
2026-01-14 15:15:26 +01:00
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron"
)
/*
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression
*/
// it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct {
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecution []*workflow_execution.WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"`
2026-03-17 11:58:27 +01:00
// Confirm, when true, triggers Schedule() to confirm the drafts held by this session.
Confirm bool `json:"confirm,omitempty"`
2026-01-14 15:15:26 +01:00
}
func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule {
ws := &WorkflowSchedule{
UUID: uuid.New().String(),
2026-03-19 09:25:46 +01:00
Start: time.Now().UTC().Add(asapBuffer),
2026-01-14 15:15:26 +01:00
BookingMode: booking.BookingMode(mode),
DurationS: durationInS,
Cron: cron,
}
2026-03-19 09:25:46 +01:00
s, err := time.ParseInLocation("2006-01-02T15:04:05", start, time.UTC)
2026-01-14 15:15:26 +01:00
if err == nil && ws.BookingMode == booking.PLANNED {
ws.Start = s // can apply a defined start other than now, if planned
}
2026-03-19 09:25:46 +01:00
e, err := time.ParseInLocation("2006-01-02T15:04:05", end, time.UTC)
2026-01-14 15:15:26 +01:00
if err == nil {
ws.End = &e
}
return ws
}
2026-03-17 11:58:27 +01:00
func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*workflow_execution.WorkflowExecution, []scheduling.SchedulerObject, []scheduling.SchedulerObject, error) {
2026-01-14 15:15:26 +01:00
access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID)
if code != 200 {
2026-03-17 11:58:27 +01:00
return false, nil, []*workflow_execution.WorkflowExecution{}, []scheduling.SchedulerObject{}, []scheduling.SchedulerObject{}, errors.New("could not load the workflow with id: " + err.Error())
2026-01-14 15:15:26 +01:00
}
wf := res.(*workflow.Workflow)
isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End,
ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies,
int(ws.BookingMode), request)
if err != nil {
2026-03-17 11:58:27 +01:00
return false, wf, []*workflow_execution.WorkflowExecution{}, []scheduling.SchedulerObject{}, []scheduling.SchedulerObject{}, err
2026-01-14 15:15:26 +01:00
}
ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
}
execs, err := ws.GetExecutions(wf, isPreemptible)
if err != nil {
2026-03-17 11:58:27 +01:00
return false, wf, []*workflow_execution.WorkflowExecution{}, []scheduling.SchedulerObject{}, []scheduling.SchedulerObject{}, err
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
purchased := []scheduling.SchedulerObject{}
bookings := []scheduling.SchedulerObject{}
2026-01-14 15:15:26 +01:00
for _, exec := range execs {
2026-03-17 11:58:27 +01:00
for _, obj := range exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds) {
purchased = append(purchased, scheduling.ToSchedulerObject(tools.PURCHASE_RESOURCE, obj))
}
for _, obj := range exec.Book(ws.UUID, wfID, priceds) {
bookings = append(bookings, scheduling.ToSchedulerObject(tools.BOOKING, obj))
}
2026-01-14 15:15:26 +01:00
}
return true, wf, execs, purchased, bookings, nil
}
2026-03-17 11:58:27 +01:00
// GenerateOrder creates a draft order (+ draft bill) for the given purchases and bookings.
// Returns the created order ID and any error.
func (ws *WorkflowSchedule) GenerateOrder(purchases []scheduling.SchedulerObject, bookings []scheduling.SchedulerObject, executionsID string, request *tools.APIRequest) (string, error) {
2026-01-14 15:15:26 +01:00
newOrder := &order.Order{
AbstractObject: utils.AbstractObject{
Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"),
IsDraft: true,
},
2026-03-17 11:58:27 +01:00
ExecutionsID: executionsID,
Purchases: []*purchase_resource.PurchaseResource{},
Bookings: []*booking.Booking{},
2026-01-14 15:15:26 +01:00
Status: enum.PENDING,
}
2026-03-17 11:58:27 +01:00
for _, purch := range purchases {
newOrder.Purchases = append(
newOrder.Purchases, scheduling.FromSchedulerObject(tools.PURCHASE_RESOURCE, purch).(*purchase_resource.PurchaseResource))
}
for _, b := range bookings {
newOrder.Bookings = append(
newOrder.Bookings, scheduling.FromSchedulerObject(tools.BOOKING, b).(*booking.Booking))
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
res, _, err := order.NewAccessor(request).StoreOne(newOrder)
if err != nil {
return "", err
}
if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil {
return res.GetID(), err
}
return res.GetID(), nil
2026-01-14 15:15:26 +01:00
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) {
if request == nil {
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no request found")
}
2026-02-23 18:10:47 +01:00
selfID, _ := oclib.GetMySelf()
2026-01-14 15:15:26 +01:00
2026-03-17 11:58:27 +01:00
// If the client provides a scheduling_id from a Check session, confirm the
// pre-created drafts (bookings/purchases). Executions already exist as drafts
// and will be confirmed later by the considers mechanism.
if ws.UUID != "" {
adminReq := &tools.APIRequest{Admin: true}
// Obsolescence check: abort if any session execution's start date has passed.
executions := loadSessionExecs(ws.UUID)
for _, exec := range executions {
2026-03-19 09:25:46 +01:00
if !exec.ExecDate.IsZero() && exec.ExecDate.Before(time.Now().UTC()) {
2026-03-17 11:58:27 +01:00
return ws, nil, nil, fmt.Errorf("execution %s is obsolete (start date in the past)", exec.GetID())
}
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
if err := ConfirmSession(ws.UUID, selfID, request); err != nil {
return ws, nil, []*workflow_execution.WorkflowExecution{}, fmt.Errorf("confirm session failed: %w", err)
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
for _, exec := range executions {
2026-03-19 09:25:46 +01:00
go WatchExecDeadline(exec.GetID(), exec.ExecutionsID, exec.ExecDate, selfID, request)
2026-03-17 11:58:27 +01:00
}
2026-01-14 15:15:26 +01:00
2026-03-17 11:58:27 +01:00
obj, _, _ := workflow.NewAccessor(request).LoadOne(wfID)
if obj == nil {
return ws, nil, executions, nil
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
wf := obj.(*workflow.Workflow)
ws.Workflow = wf
ws.WorkflowExecution = executions
wf.GetAccessor(adminReq).UpdateOne(wf.Serialize(wf), wf.GetID())
return ws, wf, executions, nil
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
// Schedule must be called from a Check session (ws.UUID set above).
// Direct scheduling without a prior Check session is not supported.
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no scheduling session: use the Check stream first")
2026-01-14 15:15:26 +01:00
}
2026-02-23 18:10:47 +01:00
// propagateResource routes a purchase or booking to its destination:
// - If destPeerID matches our own peer (selfMongoID), the object is stored
// directly in the local DB as draft and the local planner is refreshed.
// - Otherwise a NATS CREATE_RESOURCE message is emitted so the destination
// peer can process it asynchronously.
//
2026-03-17 11:58:27 +01:00
// The caller is responsible for setting obj.IsDraft before calling.
2026-02-23 18:10:47 +01:00
func propagateResource(obj utils.DBObject, destPeerID string, dt tools.DataType, selfMongoID *peer.Peer, request *tools.APIRequest, errCh chan error) {
if destPeerID == selfMongoID.GetID() {
2026-03-17 11:58:27 +01:00
stored := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).StoreOne(obj.Serialize(obj))
if stored.Err != "" || stored.Data == nil {
errCh <- fmt.Errorf("could not store %s locally: %s", dt.String(), stored.Err)
2026-02-23 18:10:47 +01:00
return
}
// The planner tracks booking time-slots only; purchases do not affect it.
if dt == tools.BOOKING {
go refreshSelfPlanner(selfMongoID.PeerID, request)
}
errCh <- nil
2026-01-14 15:15:26 +01:00
return
}
2026-03-17 11:58:27 +01:00
m := obj.Serialize(obj)
if m["dest_peer_id"] != nil {
if data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(fmt.Sprintf("%v", m["dest_peer_id"])); data.Data != nil {
m["peer_id"] = data.Data.(*peer.Peer).PeerID
}
} else {
fmt.Println("NO DEST ID")
return
}
payload, err := json.Marshal(m)
2026-02-23 18:10:47 +01:00
if err != nil {
errCh <- fmt.Errorf("could not serialize %s: %w", dt.String(), err)
2026-01-14 15:15:26 +01:00
return
}
2026-03-17 11:58:27 +01:00
if b, err := json.Marshal(&tools.PropalgationMessage{
DataType: dt.EnumIndex(),
Action: tools.PB_CREATE,
2026-02-23 18:10:47 +01:00
Payload: payload,
2026-03-17 11:58:27 +01:00
}); err == nil {
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-scheduler",
Datatype: dt,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
2026-01-14 15:15:26 +01:00
errCh <- nil
}
/*
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution
*/
func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*workflow_execution.WorkflowExecution, error) {
workflows_executions := []*workflow_execution.WorkflowExecution{}
dates, err := ws.GetDates()
if err != nil {
return workflows_executions, err
}
for _, date := range dates {
obj := &workflow_execution.WorkflowExecution{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
},
Priority: 1,
ExecutionsID: ws.UUID,
ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date
State: enum.DRAFT, // set the state to 1 (scheduled)
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
}
if ws.BookingMode != booking.PLANNED {
obj.Priority = 0
}
if ws.BookingMode == booking.PREEMPTED && isPreemptible {
obj.Priority = 7
}
ws.SelectedStrategies = obj.SelectedStrategies
ws.SelectedPartnerships = obj.SelectedPartnerships
ws.SelectedBuyings = obj.SelectedBuyings
ws.SelectedInstances = obj.SelectedInstances
workflows_executions = append(workflows_executions, obj)
}
return workflows_executions, nil
}
func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) {
schedule := []Schedule{}
if len(ws.Cron) > 0 { // if cron is set then end date should be set
if ws.End == nil {
return schedule, errors.New("a cron task should have an end date")
}
if ws.DurationS <= 0 {
ws.DurationS = ws.End.Sub(ws.Start).Seconds()
}
cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it
if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw")
}
subCron := strings.Join(cronStr[:6], " ")
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser
sched, err := specParser.Parse(subCron) // parse the cron string
if err != nil {
return schedule, errors.New("Bad cron message: " + err.Error())
}
// loop through the cron schedule to set the executions
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
e := s.Add(time.Duration(ws.DurationS) * time.Second)
schedule = append(schedule, Schedule{
Start: s,
End: &e,
})
}
} else { // if no cron, set the execution to the start date
schedule = append(schedule, Schedule{
Start: ws.Start,
End: ws.End,
})
}
return schedule, nil
}
type Schedule struct {
Start time.Time
End *time.Time
}
/*
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
* SET PROTECTION BORDER TIME
*/