package controllers import ( "fmt" "net/http" "oc-scheduler/infrastructure" "reflect" "strings" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" "github.com/google/uuid" gorillaws "github.com/gorilla/websocket" ) var orderCollection = oclib.LibDataEnum(oclib.ORDER) var logger = oclib.GetLogger() // Operations about workflow type WorkflowSchedulerController struct { beego.Controller } var wsUpgrader = gorillaws.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } // CheckStreamHandler is the WebSocket handler for slot availability checking. // It is invoked via the CheckStream controller method. // Query params: as_possible=true, preemption=true func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { wfID := strings.TrimSuffix( strings.TrimPrefix(r.URL.Path, "/oc/"), "/check", ) q := r.URL.Query() asap := q.Get("as_possible") == "true" preemption := q.Get("preemption") == "true" user, peerID, groups := oclib.ExtractTokenInfo(*r) req := &tools.APIRequest{ Username: user, PeerID: peerID, Groups: groups, Caller: nil, Admin: true, } watchedPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req) fmt.Println("Here my watched peers involved in workflow", watchedPeers) if err != nil { http.Error(w, `{"code":404,"error":"`+err.Error()+`"}`, http.StatusNotFound) return } conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { return } var ws infrastructure.WorkflowSchedule if err := conn.ReadJSON(&ws); err != nil { conn.Close() return } plannerCh, plannerUnsub := infrastructure.SubscribePlannerUpdates(watchedPeers) wfCh, wfUnsub := infrastructure.SubscribeWorkflowUpdates(wfID) executionsID := uuid.New().String() ownedPeers := infrastructure.RequestPlannerRefresh(watchedPeers, executionsID) selfID, err := oclib.GetMySelf() if err != nil || selfID == nil { logger.Err(err).Msg(err.Error()) return } selfPeerID := "" if selfID != nil { selfPeerID = selfID.PeerID } // scheduled=true once bookings/purchases/exec have been created for this session. scheduled := false confirmed := false defer func() { conn.Close() plannerUnsub() wfUnsub() infrastructure.ReleaseRefreshOwnership(ownedPeers, executionsID) if !confirmed { infrastructure.CleanupSession(selfID, executionsID, selfID, req) } }() // pushCheck runs an availability check and sends the result to the client. // If reschedule=true and the slot is available, it also creates/updates // bookings, purchases and the execution draft for this session. pushCheck := func(reschedule bool) error { result, checkErr := ws.Check(wfID, asap, preemption, req) if checkErr != nil { return checkErr } if result.Available && reschedule { // Sync the resolved start/end back to ws so that UpsertSessionDrafts // creates bookings/purchases with the actual scheduled dates (not the // raw client value which may be zero or pre-asapBuffer). ws.Start = result.Start if result.End != nil { ws.End = result.End } ws.UpsertSessionDrafts(wfID, executionsID, selfID, req) scheduled = true } result.SchedulingID = executionsID return conn.WriteJSON(result) } // Initial check + schedule. if err := pushCheck(true); err != nil { return } updateCh := make(chan infrastructure.WorkflowSchedule, 1) closeCh := make(chan struct{}) go func() { defer close(closeCh) for { var updated infrastructure.WorkflowSchedule if err := conn.ReadJSON(&updated); err != nil { return } select { case updateCh <- updated: default: <-updateCh updateCh <- updated } } }() for { select { case updated := <-updateCh: if updated.Confirm { // Confirm: flip bookings/purchases to IsDraft=false, then let // the considers mechanism transition exec to IsDraft=false. ws.UUID = executionsID _, _, _, schedErr := ws.Schedules(wfID, req) if schedErr != nil { _ = conn.WriteJSON(map[string]interface{}{ "error": schedErr.Error(), }) return } confirmed = true return } changed := updated.Cron != ws.Cron || !updated.Start.Equal(ws.Start) || updated.DurationS != ws.DurationS || (updated.End == nil) != (ws.End == nil) || (updated.End != nil && ws.End != nil && !updated.End.Equal(*ws.End)) || updated.BookingMode != ws.BookingMode || !reflect.DeepEqual(updated.SelectedBillingStrategy, ws.SelectedBillingStrategy) || !reflect.DeepEqual(updated.SelectedInstances, ws.SelectedInstances) || !reflect.DeepEqual(updated.SelectedPartnerships, ws.SelectedPartnerships) || !reflect.DeepEqual(updated.SelectedBuyings, ws.SelectedBuyings) || !reflect.DeepEqual(updated.SelectedStrategies, ws.SelectedStrategies) infrastructure.CleanupSession(selfID, executionsID, selfID, req) ws = updated if err := pushCheck(changed || !scheduled); err != nil { return } case remotePeerID := <-plannerCh: if remotePeerID == selfPeerID { // Our own planner updated (caused by our local booking store). // Just resend the current availability result without rescheduling // to avoid an infinite loop. result, checkErr := ws.Check(wfID, asap, preemption, req) if checkErr == nil { result.SchedulingID = executionsID _ = conn.WriteJSON(result) } continue } // A remote peer's planner changed. Re-check; if our slot is now // taken and we were already scheduled, reschedule at the new slot. result, checkErr := ws.Check(wfID, asap, preemption, req) if checkErr != nil { return } if !result.Available && scheduled { // Move to the next free slot and reschedule. if result.NextSlot != nil { ws.Start = *result.NextSlot } if err := pushCheck(true); err != nil { return } } else { result.SchedulingID = executionsID _ = conn.WriteJSON(result) } case <-wfCh: if newPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req); err == nil { plannerUnsub() watchedPeers = newPeers plannerCh, plannerUnsub = infrastructure.SubscribePlannerUpdates(newPeers) newOwned := infrastructure.RequestPlannerRefresh(newPeers, executionsID) ownedPeers = append(ownedPeers, newOwned...) } if err := pushCheck(false); err != nil { return } case <-closeCh: return } } } // @Title UnSchedule // @Description unschedule a workflow execution: deletes its bookings on all peers then deletes the execution. // @Param id path string true "execution id" // @Success 200 {object} map[string]interface{} // @router /:id [delete] func (o *WorkflowSchedulerController) UnSchedule() { user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) executionID := o.Ctx.Input.Param(":id") req := &tools.APIRequest{ Username: user, PeerID: peerID, Groups: groups, Admin: true, } selfID, _ := oclib.GetMySelf() if err := infrastructure.UnscheduleExecution(executionID, selfID, req); err != nil { o.Data["json"] = map[string]interface{}{"code": 404, "error": err.Error()} } else { o.Data["json"] = map[string]interface{}{"code": 200, "error": ""} } o.ServeJSON() } // @Title SearchScheduledDraftOrder // @Description schedule workflow // @Param id path string true "id execution" // @Success 200 {workspace} models.workspace // @router /:id/order [get] func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() { _, peerID, _ := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") filter := &dbs.Filters{ And: map[string][]dbs.Filter{ "workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "order_by": {{Operator: dbs.EQUAL.String(), Value: peerID}}, }, } o.Data["json"] = oclib.NewRequestAdmin(orderCollection, nil).Search(filter, "", true) //o.Data["json"] = oclib.NewRequest(orderCollection, user, peerID, groups, nil).Search(filter, "", true) o.ServeJSON() }