2026-01-14 15:15:26 +01:00
package infrastructure
import (
2026-02-23 18:10:47 +01:00
"encoding/json"
2026-01-14 15:15:26 +01:00
"errors"
"fmt"
2026-03-17 11:58:27 +01:00
"oc-scheduler/infrastructure/scheduling"
2026-01-14 15:15:26 +01:00
"strings"
"time"
2026-02-23 18:10:47 +01:00
oclib "cloud.o-forge.io/core/oc-lib"
2026-01-14 15:15:26 +01:00
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron"
)
/ *
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
* It contains the mode of the schedule ( Task or Service ) , the name of the schedule , the start and end time of the schedule and the cron expression
* /
// it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct {
UUID string ` json:"id" validate:"required" ` // ExecutionsID is the list of the executions id of the workflow
Workflow * workflow . Workflow ` json:"workflow,omitempty" ` // Workflow is the workflow dependancy of the schedule
WorkflowExecution [ ] * workflow_execution . WorkflowExecution ` json:"workflow_executions,omitempty" ` // WorkflowExecution is the list of executions of the workflow
Message string ` json:"message,omitempty" ` // Message is the message of the schedule
Warning string ` json:"warning,omitempty" ` // Warning is the warning message of the schedule
Start time . Time ` json:"start" validate:"required,ltfield=End" ` // Start is the start time of the schedule, is required and must be less than the End time
End * time . Time ` json:"end,omitempty" ` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 ` json:"duration_s" default:"-1" ` // End is the end time of the schedule
Cron string ` json:"cron,omitempty" ` // here the cron format : ss mm hh dd MM dw task
BookingMode booking . BookingMode ` json:"booking_mode,omitempty" ` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible
SelectedInstances workflow . ConfigItem ` json:"selected_instances" `
SelectedPartnerships workflow . ConfigItem ` json:"selected_partnerships" `
SelectedBuyings workflow . ConfigItem ` json:"selected_buyings" `
SelectedStrategies workflow . ConfigItem ` json:"selected_strategies" `
SelectedBillingStrategy pricing . BillingStrategy ` json:"selected_billing_strategy" `
2026-03-17 11:58:27 +01:00
// Confirm, when true, triggers Schedule() to confirm the drafts held by this session.
Confirm bool ` json:"confirm,omitempty" `
2026-01-14 15:15:26 +01:00
}
func NewScheduler ( mode int , start string , end string , durationInS float64 , cron string ) * WorkflowSchedule {
ws := & WorkflowSchedule {
UUID : uuid . New ( ) . String ( ) ,
2026-03-19 09:25:46 +01:00
Start : time . Now ( ) . UTC ( ) . Add ( asapBuffer ) ,
2026-01-14 15:15:26 +01:00
BookingMode : booking . BookingMode ( mode ) ,
DurationS : durationInS ,
Cron : cron ,
}
2026-03-19 09:25:46 +01:00
s , err := time . ParseInLocation ( "2006-01-02T15:04:05" , start , time . UTC )
2026-01-14 15:15:26 +01:00
if err == nil && ws . BookingMode == booking . PLANNED {
ws . Start = s // can apply a defined start other than now, if planned
}
2026-03-19 09:25:46 +01:00
e , err := time . ParseInLocation ( "2006-01-02T15:04:05" , end , time . UTC )
2026-01-14 15:15:26 +01:00
if err == nil {
ws . End = & e
}
return ws
}
2026-03-17 11:58:27 +01:00
func ( ws * WorkflowSchedule ) GetBuyAndBook ( wfID string , request * tools . APIRequest ) ( bool , * workflow . Workflow , [ ] * workflow_execution . WorkflowExecution , [ ] scheduling . SchedulerObject , [ ] scheduling . SchedulerObject , error ) {
2026-01-14 15:15:26 +01:00
access := workflow . NewAccessor ( request )
res , code , err := access . LoadOne ( wfID )
if code != 200 {
2026-03-17 11:58:27 +01:00
return false , nil , [ ] * workflow_execution . WorkflowExecution { } , [ ] scheduling . SchedulerObject { } , [ ] scheduling . SchedulerObject { } , errors . New ( "could not load the workflow with id: " + err . Error ( ) )
2026-01-14 15:15:26 +01:00
}
wf := res . ( * workflow . Workflow )
isPreemptible , longest , priceds , wf , err := wf . Planify ( ws . Start , ws . End ,
ws . SelectedInstances , ws . SelectedPartnerships , ws . SelectedBuyings , ws . SelectedStrategies ,
int ( ws . BookingMode ) , request )
if err != nil {
2026-03-17 11:58:27 +01:00
return false , wf , [ ] * workflow_execution . WorkflowExecution { } , [ ] scheduling . SchedulerObject { } , [ ] scheduling . SchedulerObject { } , err
2026-01-14 15:15:26 +01:00
}
ws . DurationS = longest
ws . Message = "We estimate that the workflow will start at " + ws . Start . String ( ) + " and last " + fmt . Sprintf ( "%v" , ws . DurationS ) + " seconds."
if ws . End != nil && ws . Start . Add ( time . Duration ( longest ) * time . Second ) . After ( * ws . End ) {
ws . Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
}
execs , err := ws . GetExecutions ( wf , isPreemptible )
if err != nil {
2026-03-17 11:58:27 +01:00
return false , wf , [ ] * workflow_execution . WorkflowExecution { } , [ ] scheduling . SchedulerObject { } , [ ] scheduling . SchedulerObject { } , err
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
purchased := [ ] scheduling . SchedulerObject { }
bookings := [ ] scheduling . SchedulerObject { }
2026-01-14 15:15:26 +01:00
for _ , exec := range execs {
2026-03-17 11:58:27 +01:00
for _ , obj := range exec . Buy ( ws . SelectedBillingStrategy , ws . UUID , wfID , priceds ) {
purchased = append ( purchased , scheduling . ToSchedulerObject ( tools . PURCHASE_RESOURCE , obj ) )
}
for _ , obj := range exec . Book ( ws . UUID , wfID , priceds ) {
bookings = append ( bookings , scheduling . ToSchedulerObject ( tools . BOOKING , obj ) )
}
2026-01-14 15:15:26 +01:00
}
return true , wf , execs , purchased , bookings , nil
}
2026-03-17 11:58:27 +01:00
// GenerateOrder creates a draft order (+ draft bill) for the given purchases and bookings.
// Returns the created order ID and any error.
func ( ws * WorkflowSchedule ) GenerateOrder ( purchases [ ] scheduling . SchedulerObject , bookings [ ] scheduling . SchedulerObject , executionsID string , request * tools . APIRequest ) ( string , error ) {
2026-01-14 15:15:26 +01:00
newOrder := & order . Order {
AbstractObject : utils . AbstractObject {
Name : "order_" + request . PeerID + "_" + time . Now ( ) . UTC ( ) . Format ( "2006-01-02T15:04:05" ) ,
IsDraft : true ,
} ,
2026-03-17 11:58:27 +01:00
ExecutionsID : executionsID ,
Purchases : [ ] * purchase_resource . PurchaseResource { } ,
Bookings : [ ] * booking . Booking { } ,
2026-01-14 15:15:26 +01:00
Status : enum . PENDING ,
}
2026-03-17 11:58:27 +01:00
for _ , purch := range purchases {
newOrder . Purchases = append (
newOrder . Purchases , scheduling . FromSchedulerObject ( tools . PURCHASE_RESOURCE , purch ) . ( * purchase_resource . PurchaseResource ) )
}
for _ , b := range bookings {
newOrder . Bookings = append (
newOrder . Bookings , scheduling . FromSchedulerObject ( tools . BOOKING , b ) . ( * booking . Booking ) )
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
res , _ , err := order . NewAccessor ( request ) . StoreOne ( newOrder )
if err != nil {
return "" , err
}
if _ , err := bill . DraftFirstBill ( res . ( * order . Order ) , request ) ; err != nil {
return res . GetID ( ) , err
}
return res . GetID ( ) , nil
2026-01-14 15:15:26 +01:00
}
func ( ws * WorkflowSchedule ) Schedules ( wfID string , request * tools . APIRequest ) ( * WorkflowSchedule , * workflow . Workflow , [ ] * workflow_execution . WorkflowExecution , error ) {
if request == nil {
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , errors . New ( "no request found" )
}
2026-02-23 18:10:47 +01:00
selfID , _ := oclib . GetMySelf ( )
2026-01-14 15:15:26 +01:00
2026-03-17 11:58:27 +01:00
// If the client provides a scheduling_id from a Check session, confirm the
// pre-created drafts (bookings/purchases). Executions already exist as drafts
// and will be confirmed later by the considers mechanism.
if ws . UUID != "" {
adminReq := & tools . APIRequest { Admin : true }
// Obsolescence check: abort if any session execution's start date has passed.
executions := loadSessionExecs ( ws . UUID )
for _ , exec := range executions {
2026-03-19 09:25:46 +01:00
if ! exec . ExecDate . IsZero ( ) && exec . ExecDate . Before ( time . Now ( ) . UTC ( ) ) {
2026-03-17 11:58:27 +01:00
return ws , nil , nil , fmt . Errorf ( "execution %s is obsolete (start date in the past)" , exec . GetID ( ) )
}
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
if err := ConfirmSession ( ws . UUID , selfID , request ) ; err != nil {
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , fmt . Errorf ( "confirm session failed: %w" , err )
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
for _ , exec := range executions {
2026-03-19 09:25:46 +01:00
go WatchExecDeadline ( exec . GetID ( ) , exec . ExecutionsID , exec . ExecDate , selfID , request )
2026-03-17 11:58:27 +01:00
}
2026-01-14 15:15:26 +01:00
2026-03-17 11:58:27 +01:00
obj , _ , _ := workflow . NewAccessor ( request ) . LoadOne ( wfID )
if obj == nil {
return ws , nil , executions , nil
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
wf := obj . ( * workflow . Workflow )
ws . Workflow = wf
ws . WorkflowExecution = executions
wf . GetAccessor ( adminReq ) . UpdateOne ( wf . Serialize ( wf ) , wf . GetID ( ) )
return ws , wf , executions , nil
2026-01-14 15:15:26 +01:00
}
2026-03-17 11:58:27 +01:00
// Schedule must be called from a Check session (ws.UUID set above).
// Direct scheduling without a prior Check session is not supported.
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , errors . New ( "no scheduling session: use the Check stream first" )
2026-01-14 15:15:26 +01:00
}
2026-02-23 18:10:47 +01:00
// propagateResource routes a purchase or booking to its destination:
// - If destPeerID matches our own peer (selfMongoID), the object is stored
// directly in the local DB as draft and the local planner is refreshed.
// - Otherwise a NATS CREATE_RESOURCE message is emitted so the destination
// peer can process it asynchronously.
//
2026-03-17 11:58:27 +01:00
// The caller is responsible for setting obj.IsDraft before calling.
2026-02-23 18:10:47 +01:00
func propagateResource ( obj utils . DBObject , destPeerID string , dt tools . DataType , selfMongoID * peer . Peer , request * tools . APIRequest , errCh chan error ) {
if destPeerID == selfMongoID . GetID ( ) {
2026-03-17 11:58:27 +01:00
stored := oclib . NewRequestAdmin ( oclib . LibDataEnum ( dt ) , nil ) . StoreOne ( obj . Serialize ( obj ) )
if stored . Err != "" || stored . Data == nil {
errCh <- fmt . Errorf ( "could not store %s locally: %s" , dt . String ( ) , stored . Err )
2026-02-23 18:10:47 +01:00
return
}
// The planner tracks booking time-slots only; purchases do not affect it.
if dt == tools . BOOKING {
go refreshSelfPlanner ( selfMongoID . PeerID , request )
}
errCh <- nil
2026-01-14 15:15:26 +01:00
return
}
2026-03-17 11:58:27 +01:00
m := obj . Serialize ( obj )
if m [ "dest_peer_id" ] != nil {
if data := oclib . NewRequestAdmin ( oclib . LibDataEnum ( oclib . PEER ) , nil ) . LoadOne ( fmt . Sprintf ( "%v" , m [ "dest_peer_id" ] ) ) ; data . Data != nil {
m [ "peer_id" ] = data . Data . ( * peer . Peer ) . PeerID
}
} else {
fmt . Println ( "NO DEST ID" )
return
}
payload , err := json . Marshal ( m )
2026-02-23 18:10:47 +01:00
if err != nil {
errCh <- fmt . Errorf ( "could not serialize %s: %w" , dt . String ( ) , err )
2026-01-14 15:15:26 +01:00
return
}
2026-03-17 11:58:27 +01:00
if b , err := json . Marshal ( & tools . PropalgationMessage {
DataType : dt . EnumIndex ( ) ,
Action : tools . PB_CREATE ,
2026-02-23 18:10:47 +01:00
Payload : payload ,
2026-03-17 11:58:27 +01:00
} ) ; err == nil {
tools . NewNATSCaller ( ) . SetNATSPub ( tools . PROPALGATION_EVENT , tools . NATSResponse {
FromApp : "oc-scheduler" ,
Datatype : dt ,
Method : int ( tools . PROPALGATION_EVENT ) ,
Payload : b ,
} )
}
2026-01-14 15:15:26 +01:00
errCh <- nil
}
/ *
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution . WorkflowExecution
* /
func ( ws * WorkflowSchedule ) GetExecutions ( workflow * workflow . Workflow , isPreemptible bool ) ( [ ] * workflow_execution . WorkflowExecution , error ) {
workflows_executions := [ ] * workflow_execution . WorkflowExecution { }
dates , err := ws . GetDates ( )
if err != nil {
return workflows_executions , err
}
for _ , date := range dates {
obj := & workflow_execution . WorkflowExecution {
AbstractObject : utils . AbstractObject {
UUID : uuid . New ( ) . String ( ) , // set the uuid of the execution
Name : workflow . Name + "_execution_" + date . Start . String ( ) , // set the name of the execution
} ,
Priority : 1 ,
ExecutionsID : ws . UUID ,
ExecDate : date . Start , // set the execution date
EndDate : date . End , // set the end date
State : enum . DRAFT , // set the state to 1 (scheduled)
WorkflowID : workflow . GetID ( ) , // set the workflow id dependancy of the execution
}
if ws . BookingMode != booking . PLANNED {
obj . Priority = 0
}
if ws . BookingMode == booking . PREEMPTED && isPreemptible {
obj . Priority = 7
}
ws . SelectedStrategies = obj . SelectedStrategies
ws . SelectedPartnerships = obj . SelectedPartnerships
ws . SelectedBuyings = obj . SelectedBuyings
ws . SelectedInstances = obj . SelectedInstances
workflows_executions = append ( workflows_executions , obj )
}
return workflows_executions , nil
}
func ( ws * WorkflowSchedule ) GetDates ( ) ( [ ] Schedule , error ) {
schedule := [ ] Schedule { }
if len ( ws . Cron ) > 0 { // if cron is set then end date should be set
if ws . End == nil {
return schedule , errors . New ( "a cron task should have an end date" )
}
if ws . DurationS <= 0 {
ws . DurationS = ws . End . Sub ( ws . Start ) . Seconds ( )
}
cronStr := strings . Split ( ws . Cron , " " ) // split the cron string to treat it
if len ( cronStr ) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
return schedule , errors . New ( "Bad cron message: (" + ws . Cron + "). Should be at least ss mm hh dd MM dw" )
}
subCron := strings . Join ( cronStr [ : 6 ] , " " )
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
specParser := cron . NewParser ( cron . Second | cron . Minute | cron . Hour | cron . Dom | cron . Month | cron . Dow ) // create a new cron parser
sched , err := specParser . Parse ( subCron ) // parse the cron string
if err != nil {
return schedule , errors . New ( "Bad cron message: " + err . Error ( ) )
}
// loop through the cron schedule to set the executions
for s := sched . Next ( ws . Start ) ; ! s . IsZero ( ) && s . Before ( * ws . End ) ; s = sched . Next ( s ) {
e := s . Add ( time . Duration ( ws . DurationS ) * time . Second )
schedule = append ( schedule , Schedule {
Start : s ,
End : & e ,
} )
}
} else { // if no cron, set the execution to the start date
schedule = append ( schedule , Schedule {
Start : ws . Start ,
End : ws . End ,
} )
}
return schedule , nil
}
type Schedule struct {
Start time . Time
End * time . Time
}
/ *
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
* SET PROTECTION BORDER TIME
* /