Scheduling Node
This commit is contained in:
345
infrastructure/session.go
Normal file
345
infrastructure/session.go
Normal file
@@ -0,0 +1,345 @@
|
||||
package infrastructure
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"oc-scheduler/infrastructure/scheduling"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/booking"
|
||||
"cloud.o-forge.io/core/oc-lib/models/order"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
// removeResourcePayload is sent via NATS REMOVE_RESOURCE so the receiver can
|
||||
// verify the delete order comes from the original scheduler session.
|
||||
type removeResourcePayload struct {
|
||||
ID string `json:"id"`
|
||||
SchedulerPeerID string `json:"scheduler_peer_id"`
|
||||
ExecutionsID string `json:"executions_id"`
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// DB helpers — objects are found via executions_id
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func sessionIDFilter(field, id string) *dbs.Filters {
|
||||
return &dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
field: {{Operator: dbs.EQUAL.String(), Value: id}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func loadSession(executionsID string, dt tools.DataType) []scheduling.SchedulerObject {
|
||||
results := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).Search(
|
||||
sessionIDFilter("executions_id", executionsID), "", true)
|
||||
out := make([]scheduling.SchedulerObject, 0, len(results.Data))
|
||||
for _, obj := range results.Data {
|
||||
out = append(out, scheduling.ToSchedulerObject(dt, obj))
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func loadSessionExecs(executionsID string) []*workflow_execution.WorkflowExecution {
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
results, _, _ := workflow_execution.NewAccessor(adminReq).Search(
|
||||
sessionIDFilter("executions_id", executionsID), "", true)
|
||||
out := make([]*workflow_execution.WorkflowExecution, 0, len(results))
|
||||
for _, obj := range results {
|
||||
if exec, ok := obj.(*workflow_execution.WorkflowExecution); ok {
|
||||
out = append(out, exec)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func loadSessionOrder(executionsID string) *order.Order {
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
results, _, _ := order.NewAccessor(adminReq).Search(
|
||||
sessionIDFilter("executions_id", executionsID), "", true)
|
||||
for _, obj := range results {
|
||||
if o, ok := obj.(*order.Order); ok {
|
||||
return o
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Session upsert
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// UpsertSessionDrafts creates or updates draft bookings/purchases/executions for a
|
||||
// Check session. Existing objects are found via the DB (executions_id).
|
||||
// Called on first successful check and on user date changes.
|
||||
//
|
||||
// - bookings/purchases: upserted by (resourceID, instanceID); stale ones deleted
|
||||
// - executions: replaced on every call (dates may have changed)
|
||||
// - order: created once, updated on subsequent calls
|
||||
func (ws *WorkflowSchedule) UpsertSessionDrafts(wfID, executionsID string, selfID *peer.Peer, request *tools.APIRequest) {
|
||||
_, _, execs, purchases, bookings, err := ws.GetBuyAndBook(wfID, request)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
|
||||
// --- bookings ---
|
||||
existing := map[string]scheduling.SchedulerObject{}
|
||||
seen := map[string]bool{}
|
||||
for dt, datas := range map[tools.DataType][]scheduling.SchedulerObject{
|
||||
tools.BOOKING: bookings, tools.PURCHASE_RESOURCE: purchases,
|
||||
} {
|
||||
for _, bk := range loadSession(executionsID, dt) {
|
||||
existing[bk.GetKey()] = bk
|
||||
}
|
||||
upsertSessionDrafts(dt, datas, existing, seen, selfID, executionsID, request)
|
||||
for key, prev := range existing {
|
||||
if !seen[key] {
|
||||
deleteScheduling(dt, prev, selfID, request)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// --- executions: replace on every call (dates may have changed) ---
|
||||
for _, old := range loadSessionExecs(executionsID) {
|
||||
UnregisterExecLock(old.GetID())
|
||||
workflow_execution.NewAccessor(adminReq).DeleteOne(old.GetID())
|
||||
}
|
||||
for _, exec := range execs {
|
||||
exec.ExecutionsID = executionsID
|
||||
exec.IsDraft = true
|
||||
ex, _, err := utils.GenericStoreOne(exec, workflow_execution.NewAccessor(adminReq))
|
||||
if err == nil {
|
||||
RegisterExecLock(ex.GetID())
|
||||
go WatchExecDeadline(ex.GetID(), exec.ExecDate, selfID, request)
|
||||
}
|
||||
}
|
||||
|
||||
// --- order: create once, update on subsequent calls ---
|
||||
if existing := loadSessionOrder(executionsID); existing == nil {
|
||||
ws.GenerateOrder(purchases, bookings, executionsID, request)
|
||||
} else {
|
||||
for _, purch := range purchases {
|
||||
existing.Purchases = append(
|
||||
existing.Purchases, scheduling.FromSchedulerObject(tools.PURCHASE_RESOURCE, purch).(*purchase_resource.PurchaseResource))
|
||||
}
|
||||
for _, b := range bookings {
|
||||
existing.Bookings = append(
|
||||
existing.Bookings, scheduling.FromSchedulerObject(tools.BOOKING, b).(*booking.Booking))
|
||||
}
|
||||
utils.GenericRawUpdateOne(existing, existing.GetID(), order.NewAccessor(adminReq))
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Session lifecycle
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func upsertSessionDrafts(dt tools.DataType, datas []scheduling.SchedulerObject, existing map[string]scheduling.SchedulerObject,
|
||||
seen map[string]bool, selfID *peer.Peer,
|
||||
executionsID string, request *tools.APIRequest) {
|
||||
fmt.Println("UpsertSessionDrafts", len(datas), len(existing))
|
||||
for _, bk := range datas {
|
||||
bk.SetSchedulerPeerID(selfID.PeerID)
|
||||
bk.SetExecutionsID(executionsID)
|
||||
seen[bk.GetKey()] = true
|
||||
if prev, ok := existing[bk.GetKey()]; ok {
|
||||
bk.SetID(prev.GetID())
|
||||
bk.SetIsDraft(false)
|
||||
// Convert to concrete type (Booking/PurchaseResource) so that
|
||||
// GenericRawUpdateOne serializes the real struct, not the wrapper.
|
||||
propagateWriteResource(
|
||||
scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request)
|
||||
} else {
|
||||
errCh := make(chan error, 1)
|
||||
propagateResource(scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request, errCh)
|
||||
<-errCh
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CleanupSession deletes all draft bookings/purchases/executions/order for a
|
||||
// session (called when the WebSocket closes without a confirm).
|
||||
func CleanupSession(self *peer.Peer, executionsID string, selfID *peer.Peer, request *tools.APIRequest) {
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
for _, exec := range loadSessionExecs(executionsID) {
|
||||
UnscheduleExecution(exec.GetID(), selfID, request)
|
||||
workflow_execution.NewAccessor(adminReq).DeleteOne(exec.GetID())
|
||||
}
|
||||
if o := loadSessionOrder(executionsID); o != nil {
|
||||
order.NewAccessor(adminReq).DeleteOne(o.GetID())
|
||||
}
|
||||
}
|
||||
|
||||
// ConfirmSession flips all session drafts to IsDraft=false and propagates them.
|
||||
// The considers mechanism then transitions executions to IsDraft=false once
|
||||
// all remote peers acknowledge.
|
||||
func ConfirmSession(executionsID string, selfID *peer.Peer, request *tools.APIRequest) error {
|
||||
for _, dt := range []tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE} {
|
||||
for _, bk := range loadSession(executionsID, dt) {
|
||||
bk.SetIsDraft(false)
|
||||
propagateWriteResource(
|
||||
scheduling.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, selfID, request)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// confirmSessionOrder sets the order IsDraft=false once all considers are received.
|
||||
func confirmSessionOrder(executionsID string, adminReq *tools.APIRequest) {
|
||||
if o := loadSessionOrder(executionsID); o != nil {
|
||||
o.IsDraft = false
|
||||
utils.GenericRawUpdateOne(o, o.GetID(), order.NewAccessor(adminReq))
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Propagation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// propagateWriteResource routes a booking/purchase write to its destination:
|
||||
// - local peer → DB upsert; emits considers on confirm (IsDraft=false)
|
||||
// - remote peer → NATS CREATE_RESOURCE (receiver upserts)
|
||||
func propagateWriteResource(obj utils.DBObject, destPeerID string, dt tools.DataType, selfID *peer.Peer, request *tools.APIRequest) {
|
||||
if destPeerID == selfID.GetID() {
|
||||
if _, _, err := utils.GenericRawUpdateOne(obj, obj.GetID(), obj.GetAccessor(request)); err != nil {
|
||||
fmt.Printf("propagateWriteResource: local update failed for %s %s: %v\n", dt, obj.GetID(), err)
|
||||
return
|
||||
}
|
||||
if dt == tools.BOOKING {
|
||||
go refreshSelfPlanner(selfID.PeerID, request)
|
||||
}
|
||||
fmt.Println("IS DRAFTED", obj.IsDrafted())
|
||||
if !obj.IsDrafted() {
|
||||
if payload, err := json.Marshal(&executionConsidersPayload{
|
||||
ID: obj.GetID(),
|
||||
}); err == nil {
|
||||
go updateExecutionState(payload, dt)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
payload, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
|
||||
FromApp: "oc-scheduler",
|
||||
Datatype: dt,
|
||||
Method: int(tools.CREATE_RESOURCE),
|
||||
Payload: payload,
|
||||
})
|
||||
}
|
||||
|
||||
// deleteBooking deletes a booking from its destination peer (local DB or NATS).
|
||||
func deleteScheduling(dt tools.DataType, bk scheduling.SchedulerObject, selfID *peer.Peer, request *tools.APIRequest) {
|
||||
if bk.GetDestPeer() == selfID.GetID() {
|
||||
oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).DeleteOne(bk.GetID())
|
||||
go refreshSelfPlanner(selfID.PeerID, request)
|
||||
return
|
||||
}
|
||||
emitNATSRemove(bk.GetID(), bk.GetPeerSession(), bk.GetExecutionsId(), dt)
|
||||
}
|
||||
|
||||
// emitNATSRemove sends a REMOVE_RESOURCE event to the remote peer carrying
|
||||
// auth fields so the receiver can verify the delete is legitimate.
|
||||
func emitNATSRemove(id, schedulerPeerID, executionsID string, dt tools.DataType) {
|
||||
payload, _ := json.Marshal(removeResourcePayload{
|
||||
ID: id,
|
||||
SchedulerPeerID: schedulerPeerID,
|
||||
ExecutionsID: executionsID,
|
||||
})
|
||||
tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{
|
||||
FromApp: "oc-scheduler",
|
||||
Datatype: dt,
|
||||
Method: int(tools.REMOVE_RESOURCE),
|
||||
Payload: payload,
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Deadline watchers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// WatchExecDeadline purges all unconfirmed bookings/purchases for an execution
|
||||
// one minute before its scheduled start, to avoid stale drafts blocking resources.
|
||||
// If the deadline has already passed (e.g. after a process restart), it fires immediately.
|
||||
func WatchExecDeadline(executionID string, execDate time.Time, selfID *peer.Peer, request *tools.APIRequest) {
|
||||
fmt.Println("WatchExecDeadline")
|
||||
delay := time.Until(execDate.UTC().Add(-1 * time.Minute))
|
||||
if delay <= 0 {
|
||||
go purgeUnconfirmedExecution(executionID, selfID, request)
|
||||
return
|
||||
}
|
||||
time.AfterFunc(delay, func() { purgeUnconfirmedExecution(executionID, selfID, request) })
|
||||
}
|
||||
|
||||
func purgeUnconfirmedExecution(executionID string, selfID *peer.Peer, request *tools.APIRequest) {
|
||||
acc := workflow_execution.NewAccessor(&tools.APIRequest{Admin: true})
|
||||
UnscheduleExecution(executionID, selfID, request)
|
||||
_, _, err := acc.DeleteOne(executionID)
|
||||
fmt.Printf("purgeUnconfirmedExecution: cleaned up resources for execution %s\n", err)
|
||||
}
|
||||
|
||||
// RecoverDraftExecutions is called at startup to restore deadline watchers for
|
||||
// draft executions that survived a process restart. Executions already past
|
||||
// their deadline are purged immediately.
|
||||
func RecoverDraftExecutions() {
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
var selfID *peer.Peer
|
||||
for selfID == nil {
|
||||
selfID, _ = oclib.GetMySelf()
|
||||
if selfID == nil {
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
results, _, _ := workflow_execution.NewAccessor(adminReq).Search(nil, "*", true)
|
||||
for _, obj := range results {
|
||||
exec, ok := obj.(*workflow_execution.WorkflowExecution)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
RegisterExecLock(exec.GetID())
|
||||
go WatchExecDeadline(exec.GetID(), exec.ExecDate, selfID, adminReq)
|
||||
}
|
||||
fmt.Printf("RecoverDraftExecutions: recovered %d draft executions\n", len(results))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unschedule
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// UnscheduleExecution deletes all bookings for an execution (via PeerBookByGraph)
|
||||
// then deletes the execution itself.
|
||||
func UnscheduleExecution(executionID string, selfID *peer.Peer, request *tools.APIRequest) error {
|
||||
fmt.Println("UnscheduleExecution")
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(executionID)
|
||||
if err != nil || res == nil {
|
||||
return fmt.Errorf("execution %s not found: %w", executionID, err)
|
||||
}
|
||||
exec := res.(*workflow_execution.WorkflowExecution)
|
||||
for _, byResource := range exec.PeerBookByGraph {
|
||||
for _, bookingIDs := range byResource {
|
||||
for _, bkID := range bookingIDs {
|
||||
bkRes, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bkID)
|
||||
fmt.Println("UnscheduleExecution", bkID, loadErr)
|
||||
if loadErr != nil || bkRes == nil {
|
||||
continue
|
||||
}
|
||||
deleteScheduling(tools.BOOKING, scheduling.ToSchedulerObject(tools.BOOKING, bkRes), selfID, request)
|
||||
}
|
||||
}
|
||||
}
|
||||
workflow_execution.NewAccessor(adminReq).DeleteOne(executionID)
|
||||
UnregisterExecLock(executionID)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user