Files
oc-scheduler/infrastructure/session.go

346 lines
13 KiB
Go
Raw Normal View History

2026-03-17 11:58:27 +01:00
package infrastructure
import (
"encoding/json"
"fmt"
"oc-scheduler/infrastructure/scheduling"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
)
// removeResourcePayload is sent via NATS REMOVE_RESOURCE so the receiver can
// verify the delete order comes from the original scheduler session.
type removeResourcePayload struct {
ID string `json:"id"`
SchedulerPeerID string `json:"scheduler_peer_id"`
ExecutionsID string `json:"executions_id"`
}
// ---------------------------------------------------------------------------
// DB helpers — objects are found via executions_id
// ---------------------------------------------------------------------------
func sessionIDFilter(field, id string) *dbs.Filters {
return &dbs.Filters{
And: map[string][]dbs.Filter{
field: {{Operator: dbs.EQUAL.String(), Value: id}},
},
}
}
func loadSession(executionsID string, dt tools.DataType) []scheduling.SchedulerObject {
results := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).Search(
sessionIDFilter("executions_id", executionsID), "", true)
out := make([]scheduling.SchedulerObject, 0, len(results.Data))
for _, obj := range results.Data {
out = append(out, scheduling.ToSchedulerObject(dt, obj))
}
return out
}
func loadSessionExecs(executionsID string) []*workflow_execution.WorkflowExecution {
adminReq := &tools.APIRequest{Admin: true}
results, _, _ := workflow_execution.NewAccessor(adminReq).Search(
sessionIDFilter("executions_id", executionsID), "", true)
out := make([]*workflow_execution.WorkflowExecution, 0, len(results))
for _, obj := range results {
if exec, ok := obj.(*workflow_execution.WorkflowExecution); ok {
out = append(out, exec)
}
}
return out
}
func loadSessionOrder(executionsID string) *order.Order {
adminReq := &tools.APIRequest{Admin: true}
results, _, _ := order.NewAccessor(adminReq).Search(
sessionIDFilter("executions_id", executionsID), "", true)
for _, obj := range results {
if o, ok := obj.(*order.Order); ok {
return o
}
}
return nil
}
// ---------------------------------------------------------------------------
// Session upsert
// ---------------------------------------------------------------------------
// UpsertSessionDrafts creates or updates draft bookings/purchases/executions for a
// Check session. Existing objects are found via the DB (executions_id).
// Called on first successful check and on user date changes.
//
// - bookings/purchases: upserted by (resourceID, instanceID); stale ones deleted
// - executions: replaced on every call (dates may have changed)
// - order: created once, updated on subsequent calls
func (ws *WorkflowSchedule) UpsertSessionDrafts(wfID, executionsID string, selfID *peer.Peer, request *tools.APIRequest) {
_, _, execs, purchases, bookings, err := ws.GetBuyAndBook(wfID, request)
if err != nil {
return
}
adminReq := &tools.APIRequest{Admin: true}
// --- bookings ---
existing := map[string]scheduling.SchedulerObject{}
seen := map[string]bool{}
for dt, datas := range map[tools.DataType][]scheduling.SchedulerObject{
tools.BOOKING: bookings, tools.PURCHASE_RESOURCE: purchases,
} {
for _, bk := range loadSession(executionsID, dt) {
existing[bk.GetKey()] = bk
}
upsertSessionDrafts(dt, datas, existing, seen, selfID, executionsID, request)
for key, prev := range existing {
if !seen[key] {
deleteScheduling(dt, prev, selfID, request)
}
}
}
// --- executions: replace on every call (dates may have changed) ---
for _, old := range loadSessionExecs(executionsID) {
UnregisterExecLock(old.GetID())
workflow_execution.NewAccessor(adminReq).DeleteOne(old.GetID())
}
for _, exec := range execs {
exec.ExecutionsID = executionsID
exec.IsDraft = true
ex, _, err := utils.GenericStoreOne(exec, workflow_execution.NewAccessor(adminReq))
if err == nil {
RegisterExecLock(ex.GetID())
go WatchExecDeadline(ex.GetID(), exec.ExecDate, selfID, request)
}
}
// --- order: create once, update on subsequent calls ---
if existing := loadSessionOrder(executionsID); existing == nil {
ws.GenerateOrder(purchases, bookings, executionsID, request)
} else {
for _, purch := range purchases {
existing.Purchases = append(
existing.Purchases, scheduling.FromSchedulerObject(tools.PURCHASE_RESOURCE, purch).(*purchase_resource.PurchaseResource))
}
for _, b := range bookings {
existing.Bookings = append(
existing.Bookings, scheduling.FromSchedulerObject(tools.BOOKING, b).(*booking.Booking))
}
utils.GenericRawUpdateOne(existing, existing.GetID(), order.NewAccessor(adminReq))
}
}
// ---------------------------------------------------------------------------
// Session lifecycle
// ---------------------------------------------------------------------------
func upsertSessionDrafts(dt tools.DataType, datas []scheduling.SchedulerObject, existing map[string]scheduling.SchedulerObject,
seen map[string]bool, selfID *peer.Peer,
executionsID string, request *tools.APIRequest) {
fmt.Println("UpsertSessionDrafts", len(datas), len(existing))
for _, bk := range datas {
bk.SetSchedulerPeerID(selfID.PeerID)
bk.SetExecutionsID(executionsID)
seen[bk.GetKey()] = true
if prev, ok := existing[bk.GetKey()]; ok {
bk.SetID(prev.GetID())
bk.SetIsDraft(false)
// Convert to concrete type (Booking/PurchaseResource) so that
// GenericRawUpdateOne serializes the real struct, not the wrapper.
propagateWriteResource(
scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request)
} else {
errCh := make(chan error, 1)
propagateResource(scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request, errCh)
<-errCh
}
}
}
// CleanupSession deletes all draft bookings/purchases/executions/order for a
// session (called when the WebSocket closes without a confirm).
func CleanupSession(self *peer.Peer, executionsID string, selfID *peer.Peer, request *tools.APIRequest) {
adminReq := &tools.APIRequest{Admin: true}
for _, exec := range loadSessionExecs(executionsID) {
UnscheduleExecution(exec.GetID(), selfID, request)
workflow_execution.NewAccessor(adminReq).DeleteOne(exec.GetID())
}
if o := loadSessionOrder(executionsID); o != nil {
order.NewAccessor(adminReq).DeleteOne(o.GetID())
}
}
// ConfirmSession flips all session drafts to IsDraft=false and propagates them.
// The considers mechanism then transitions executions to IsDraft=false once
// all remote peers acknowledge.
func ConfirmSession(executionsID string, selfID *peer.Peer, request *tools.APIRequest) error {
for _, dt := range []tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE} {
for _, bk := range loadSession(executionsID, dt) {
bk.SetIsDraft(false)
propagateWriteResource(
scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request)
}
}
return nil
}
// confirmSessionOrder sets the order IsDraft=false once all considers are received.
func confirmSessionOrder(executionsID string, adminReq *tools.APIRequest) {
if o := loadSessionOrder(executionsID); o != nil {
o.IsDraft = false
utils.GenericRawUpdateOne(o, o.GetID(), order.NewAccessor(adminReq))
}
}
// ---------------------------------------------------------------------------
// Propagation
// ---------------------------------------------------------------------------
// propagateWriteResource routes a booking/purchase write to its destination:
// - local peer → DB upsert; emits considers on confirm (IsDraft=false)
// - remote peer → NATS CREATE_RESOURCE (receiver upserts)
func propagateWriteResource(obj utils.DBObject, destPeerID string, dt tools.DataType, selfID *peer.Peer, request *tools.APIRequest) {
if destPeerID == selfID.GetID() {
if _, _, err := utils.GenericRawUpdateOne(obj, obj.GetID(), obj.GetAccessor(request)); err != nil {
fmt.Printf("propagateWriteResource: local update failed for %s %s: %v\n", dt, obj.GetID(), err)
return
}
if dt == tools.BOOKING {
go refreshSelfPlanner(selfID.PeerID, request)
}
fmt.Println("IS DRAFTED", obj.IsDrafted())
if !obj.IsDrafted() {
if payload, err := json.Marshal(&executionConsidersPayload{
ID: obj.GetID(),
}); err == nil {
go updateExecutionState(payload, dt)
}
}
return
}
payload, err := json.Marshal(obj)
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
FromApp: "oc-scheduler",
Datatype: dt,
Method: int(tools.CREATE_RESOURCE),
Payload: payload,
})
}
// deleteBooking deletes a booking from its destination peer (local DB or NATS).
func deleteScheduling(dt tools.DataType, bk scheduling.SchedulerObject, selfID *peer.Peer, request *tools.APIRequest) {
if bk.GetDestPeer() == selfID.GetID() {
oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).DeleteOne(bk.GetID())
go refreshSelfPlanner(selfID.PeerID, request)
return
}
emitNATSRemove(bk.GetID(), bk.GetPeerSession(), bk.GetExecutionsId(), dt)
}
// emitNATSRemove sends a REMOVE_RESOURCE event to the remote peer carrying
// auth fields so the receiver can verify the delete is legitimate.
func emitNATSRemove(id, schedulerPeerID, executionsID string, dt tools.DataType) {
payload, _ := json.Marshal(removeResourcePayload{
ID: id,
SchedulerPeerID: schedulerPeerID,
ExecutionsID: executionsID,
})
tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{
FromApp: "oc-scheduler",
Datatype: dt,
Method: int(tools.REMOVE_RESOURCE),
Payload: payload,
})
}
// ---------------------------------------------------------------------------
// Deadline watchers
// ---------------------------------------------------------------------------
// WatchExecDeadline purges all unconfirmed bookings/purchases for an execution
// one minute before its scheduled start, to avoid stale drafts blocking resources.
// If the deadline has already passed (e.g. after a process restart), it fires immediately.
func WatchExecDeadline(executionID string, execDate time.Time, selfID *peer.Peer, request *tools.APIRequest) {
fmt.Println("WatchExecDeadline")
delay := time.Until(execDate.UTC().Add(-1 * time.Minute))
if delay <= 0 {
go purgeUnconfirmedExecution(executionID, selfID, request)
return
}
time.AfterFunc(delay, func() { purgeUnconfirmedExecution(executionID, selfID, request) })
}
func purgeUnconfirmedExecution(executionID string, selfID *peer.Peer, request *tools.APIRequest) {
acc := workflow_execution.NewAccessor(&tools.APIRequest{Admin: true})
UnscheduleExecution(executionID, selfID, request)
_, _, err := acc.DeleteOne(executionID)
fmt.Printf("purgeUnconfirmedExecution: cleaned up resources for execution %s\n", err)
}
// RecoverDraftExecutions is called at startup to restore deadline watchers for
// draft executions that survived a process restart. Executions already past
// their deadline are purged immediately.
func RecoverDraftExecutions() {
adminReq := &tools.APIRequest{Admin: true}
var selfID *peer.Peer
for selfID == nil {
selfID, _ = oclib.GetMySelf()
if selfID == nil {
time.Sleep(5 * time.Second)
}
}
results, _, _ := workflow_execution.NewAccessor(adminReq).Search(nil, "*", true)
for _, obj := range results {
exec, ok := obj.(*workflow_execution.WorkflowExecution)
if !ok {
continue
}
RegisterExecLock(exec.GetID())
go WatchExecDeadline(exec.GetID(), exec.ExecDate, selfID, adminReq)
}
fmt.Printf("RecoverDraftExecutions: recovered %d draft executions\n", len(results))
}
// ---------------------------------------------------------------------------
// Unschedule
// ---------------------------------------------------------------------------
// UnscheduleExecution deletes all bookings for an execution (via PeerBookByGraph)
// then deletes the execution itself.
func UnscheduleExecution(executionID string, selfID *peer.Peer, request *tools.APIRequest) error {
fmt.Println("UnscheduleExecution")
adminReq := &tools.APIRequest{Admin: true}
res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(executionID)
if err != nil || res == nil {
return fmt.Errorf("execution %s not found: %w", executionID, err)
}
exec := res.(*workflow_execution.WorkflowExecution)
for _, byResource := range exec.PeerBookByGraph {
for _, bookingIDs := range byResource {
for _, bkID := range bookingIDs {
bkRes, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bkID)
fmt.Println("UnscheduleExecution", bkID, loadErr)
if loadErr != nil || bkRes == nil {
continue
}
deleteScheduling(tools.BOOKING, scheduling.ToSchedulerObject(tools.BOOKING, bkRes), selfID, request)
}
}
}
workflow_execution.NewAccessor(adminReq).DeleteOne(executionID)
UnregisterExecLock(executionID)
return nil
}