package infrastructure import ( "context" "encoding/json" "fmt" "oc-scheduler/conf" "oc-scheduler/infrastructure/scheduling" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) // removeResourcePayload is sent via NATS REMOVE_RESOURCE so the receiver can // verify the delete order comes from the original scheduler session. type removeResourcePayload struct { ID string `json:"id"` SchedulerPeerID string `json:"scheduler_peer_id"` ExecutionsID string `json:"executions_id"` } // --------------------------------------------------------------------------- // DB helpers — objects are found via executions_id // --------------------------------------------------------------------------- func sessionIDFilter(field, id string) *dbs.Filters { return &dbs.Filters{ And: map[string][]dbs.Filter{ field: {{Operator: dbs.EQUAL.String(), Value: id}}, }, } } func loadSession(executionsID string, dt tools.DataType) []scheduling.SchedulerObject { results := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).Search( sessionIDFilter("executions_id", executionsID), "", true) out := make([]scheduling.SchedulerObject, 0, len(results.Data)) for _, obj := range results.Data { out = append(out, scheduling.ToSchedulerObject(dt, obj)) } return out } func loadSessionExecs(executionsID string) []*workflow_execution.WorkflowExecution { adminReq := &tools.APIRequest{Admin: true} results, _, _ := workflow_execution.NewAccessor(adminReq).Search( sessionIDFilter("executions_id", executionsID), "", true) out := make([]*workflow_execution.WorkflowExecution, 0, len(results)) for _, obj := range results { if exec, ok := obj.(*workflow_execution.WorkflowExecution); ok { out = append(out, exec) } } return out } func loadSessionOrder(executionsID string) *order.Order { adminReq := &tools.APIRequest{Admin: true} results, _, _ := order.NewAccessor(adminReq).Search( sessionIDFilter("executions_id", executionsID), "", true) for _, obj := range results { if o, ok := obj.(*order.Order); ok { return o } } return nil } // --------------------------------------------------------------------------- // Session upsert // --------------------------------------------------------------------------- // UpsertSessionDrafts creates or updates draft bookings/purchases/executions for a // Check session. Existing objects are found via the DB (executions_id). // Called on first successful check and on user date changes. // // - bookings/purchases: upserted by (resourceID, instanceID); stale ones deleted // - executions: replaced on every call (dates may have changed) // - order: created once, updated on subsequent calls func (ws *WorkflowSchedule) UpsertSessionDrafts(wfID, executionsID string, selfID *peer.Peer, request *tools.APIRequest) { _, _, execs, purchases, bookings, err := ws.GetBuyAndBook(wfID, request) if err != nil { return } adminReq := &tools.APIRequest{Admin: true} // --- bookings --- existing := map[string]scheduling.SchedulerObject{} seen := map[string]bool{} for dt, datas := range map[tools.DataType][]scheduling.SchedulerObject{ tools.BOOKING: bookings, tools.PURCHASE_RESOURCE: purchases, } { for _, bk := range loadSession(executionsID, dt) { existing[bk.GetKey()] = bk } upsertSessionDrafts(dt, datas, existing, seen, selfID, executionsID, request) for key, prev := range existing { if !seen[key] { deleteScheduling(dt, prev, selfID, request) } } } // --- executions: replace on every call (dates may have changed) --- for _, old := range loadSessionExecs(executionsID) { UnregisterExecLock(old.GetID()) workflow_execution.NewAccessor(adminReq).DeleteOne(old.GetID()) } for _, exec := range execs { exec.ExecutionsID = executionsID exec.IsDraft = true ex, _, err := utils.GenericStoreOne(exec, workflow_execution.NewAccessor(adminReq)) if err == nil { RegisterExecLock(ex.GetID()) go WatchExecDeadline( ex.GetID(), executionsID, exec.ExecDate, selfID, request) } } // --- order: create once, update on subsequent calls --- if existing := loadSessionOrder(executionsID); existing == nil { ws.GenerateOrder(purchases, bookings, executionsID, request) } else { for _, purch := range purchases { existing.Purchases = append( existing.Purchases, scheduling.FromSchedulerObject(tools.PURCHASE_RESOURCE, purch).(*purchase_resource.PurchaseResource)) } for _, b := range bookings { existing.Bookings = append( existing.Bookings, scheduling.FromSchedulerObject(tools.BOOKING, b).(*booking.Booking)) } utils.GenericRawUpdateOne(existing, existing.GetID(), order.NewAccessor(adminReq)) } } // --------------------------------------------------------------------------- // Session lifecycle // --------------------------------------------------------------------------- func upsertSessionDrafts(dt tools.DataType, datas []scheduling.SchedulerObject, existing map[string]scheduling.SchedulerObject, seen map[string]bool, selfID *peer.Peer, executionsID string, request *tools.APIRequest) { fmt.Println("UpsertSessionDrafts", len(datas), len(existing)) for _, bk := range datas { bk.SetSchedulerPeerID(selfID.PeerID) bk.SetExecutionsID(executionsID) seen[bk.GetKey()] = true if prev, ok := existing[bk.GetKey()]; ok { bk.SetID(prev.GetID()) bk.SetIsDraft(false) // Convert to concrete type (Booking/PurchaseResource) so that // GenericRawUpdateOne serializes the real struct, not the wrapper. propagateWriteResource( scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request) } else { errCh := make(chan error, 1) propagateResource(scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request, errCh) <-errCh } } } // CleanupSession deletes all draft bookings/purchases/executions/order for a // session (called when the WebSocket closes without a confirm). func CleanupSession(self *peer.Peer, executionsID string, selfID *peer.Peer, request *tools.APIRequest) { adminReq := &tools.APIRequest{Admin: true} for _, exec := range loadSessionExecs(executionsID) { UnscheduleExecution(exec.GetID(), selfID, request) workflow_execution.NewAccessor(adminReq).DeleteOne(exec.GetID()) } if o := loadSessionOrder(executionsID); o != nil { order.NewAccessor(adminReq).DeleteOne(o.GetID()) } } // ConfirmSession flips all session drafts to IsDraft=false and propagates them. // The considers mechanism then transitions executions to IsDraft=false once // all remote peers acknowledge. func ConfirmSession(executionsID string, selfID *peer.Peer, request *tools.APIRequest) error { for _, dt := range []tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE} { for _, bk := range loadSession(executionsID, dt) { bk.SetIsDraft(false) propagateWriteResource( scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request) } } return nil } // confirmSessionOrder sets the order IsDraft=false once all considers are received. func confirmSessionOrder(executionsID string, adminReq *tools.APIRequest) { if o := loadSessionOrder(executionsID); o != nil { o.IsDraft = false utils.GenericRawUpdateOne(o, o.GetID(), order.NewAccessor(adminReq)) } } // --------------------------------------------------------------------------- // Propagation // --------------------------------------------------------------------------- // propagateWriteResource routes a booking/purchase write to its destination: // - local peer → DB upsert; emits considers on confirm (IsDraft=false) // - remote peer → NATS CREATE_RESOURCE (receiver upserts) func propagateWriteResource(obj utils.DBObject, destPeerID string, dt tools.DataType, selfID *peer.Peer, request *tools.APIRequest) { if destPeerID == selfID.GetID() { if _, _, err := utils.GenericRawUpdateOne(obj, obj.GetID(), obj.GetAccessor(request)); err != nil { fmt.Printf("propagateWriteResource: local update failed for %s %s: %v\n", dt, obj.GetID(), err) return } if dt == tools.BOOKING { go refreshSelfPlanner(selfID.PeerID, request) } fmt.Println("IS DRAFTED", obj.IsDrafted()) if !obj.IsDrafted() { if payload, err := json.Marshal(&executionConsidersPayload{ ID: obj.GetID(), }); err == nil { go updateExecutionState(payload, dt) } } return } payload, err := json.Marshal(obj) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: dt, Method: int(tools.CREATE_RESOURCE), Payload: payload, }) } // deleteBooking deletes a booking from its destination peer (local DB or NATS). func deleteScheduling(dt tools.DataType, bk scheduling.SchedulerObject, selfID *peer.Peer, request *tools.APIRequest) { if bk.GetDestPeer() == selfID.GetID() { oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).DeleteOne(bk.GetID()) go refreshSelfPlanner(selfID.PeerID, request) return } emitNATSRemove(bk.GetID(), bk.GetPeerSession(), bk.GetExecutionsId(), dt) } // emitNATSRemove sends a REMOVE_RESOURCE event to the remote peer carrying // auth fields so the receiver can verify the delete is legitimate. func emitNATSRemove(id, schedulerPeerID, executionsID string, dt tools.DataType) { payload, _ := json.Marshal(removeResourcePayload{ ID: id, SchedulerPeerID: schedulerPeerID, ExecutionsID: executionsID, }) tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: dt, Method: int(tools.REMOVE_RESOURCE), Payload: payload, }) } // --------------------------------------------------------------------------- // Deadline watchers // --------------------------------------------------------------------------- // WatchExecDeadline fires one minute before the execution start date. // If the execution is still a draft it is purged; otherwise the namespace // is created and a WatchExecEnd watcher is armed. // If the deadline has already passed (e.g. after a process restart), it fires immediately. func WatchExecDeadline(executionID string, ns string, execDate time.Time, selfID *peer.Peer, request *tools.APIRequest) { fmt.Println("WatchExecDeadline") delay := time.Until(execDate.UTC().Add(-1 * time.Minute)) if delay <= 0 { go handleExecDeadline(executionID, ns, selfID, request) return } time.AfterFunc(delay, func() { handleExecDeadline(executionID, ns, selfID, request) }) } func handleExecDeadline(executionID string, ns string, selfID *peer.Peer, request *tools.APIRequest) { adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(executionID) if err != nil || res == nil { fmt.Printf("handleExecDeadline: execution %s not found\n", executionID) return } exec := res.(*workflow_execution.WorkflowExecution) if exec.IsDraft { UnscheduleExecution(executionID, selfID, request) workflow_execution.NewAccessor(adminReq).DeleteOne(executionID) fmt.Printf("handleExecDeadline: purged draft execution %s\n", executionID) return } if serv, err := tools.NewKubernetesService( conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData); err != nil { fmt.Printf("handleExecDeadline: k8s init failed for %s: %v\n", executionID, err) } else if err := serv.ProvisionExecutionNamespace(context.Background(), ns); err != nil { fmt.Printf("handleExecDeadline: failed to provision namespace for %s: %v\n", ns, err) } go WatchExecEnd(executionID, ns, exec.EndDate, exec.ExecDate) } // WatchExecEnd fires at the execution end date (ExecDate+1h when EndDate is nil) // and deletes the Kubernetes namespace associated with the execution. func WatchExecEnd(executionID string, ns string, endDate *time.Time, execDate time.Time) { var end time.Time if endDate != nil { end = *endDate } else { end = execDate.UTC().Add(time.Hour) } delay := time.Until(end.UTC()) fire := func() { serv, err := tools.NewKubernetesService( conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { fmt.Printf("WatchExecEnd: k8s init failed for %s: %v\n", executionID, err) return } if err := serv.TeardownExecutionNamespace(context.Background(), ns); err != nil { fmt.Printf("WatchExecEnd: failed to teardown namespace %s: %v\n", ns, err) } } if delay <= 0 { go fire() return } time.AfterFunc(delay, fire) } // RecoverDraftExecutions is called at startup to restore deadline watchers for // draft executions that survived a process restart. Executions already past // their deadline are purged immediately. func RecoverDraftExecutions() { adminReq := &tools.APIRequest{Admin: true} var selfID *peer.Peer for selfID == nil { selfID, _ = oclib.GetMySelf() if selfID == nil { time.Sleep(5 * time.Second) } } results, _, _ := workflow_execution.NewAccessor(adminReq).Search(nil, "*", true) for _, obj := range results { exec, ok := obj.(*workflow_execution.WorkflowExecution) if !ok { continue } RegisterExecLock(exec.GetID()) go WatchExecDeadline(exec.GetID(), exec.ExecutionsID, exec.ExecDate, selfID, adminReq) } fmt.Printf("RecoverDraftExecutions: recovered %d draft executions\n", len(results)) } // --------------------------------------------------------------------------- // Unschedule // --------------------------------------------------------------------------- // UnscheduleExecution deletes all bookings for an execution (via PeerBookByGraph) // then deletes the execution itself. func UnscheduleExecution(executionID string, selfID *peer.Peer, request *tools.APIRequest) error { fmt.Println("UnscheduleExecution") adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(executionID) if err != nil || res == nil { return fmt.Errorf("execution %s not found: %w", executionID, err) } exec := res.(*workflow_execution.WorkflowExecution) for _, byResource := range exec.PeerBookByGraph { for _, bookingIDs := range byResource { for _, bkID := range bookingIDs { bkRes, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bkID) fmt.Println("UnscheduleExecution", bkID, loadErr) if loadErr != nil || bkRes == nil { continue } deleteScheduling(tools.BOOKING, scheduling.ToSchedulerObject(tools.BOOKING, bkRes), selfID, request) } } } workflow_execution.NewAccessor(adminReq).DeleteOne(executionID) UnregisterExecLock(executionID) return nil }