Files
oc-scheduler/infrastructure/nats_handlers.go

249 lines
8.3 KiB
Go
Raw Permalink Normal View History

2026-03-17 11:58:27 +01:00
package infrastructure
import (
"encoding/json"
"fmt"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/tools"
)
2026-03-17 15:12:29 +01:00
func handleConfirm(resp tools.NATSResponse) {
confirmResource(string(resp.Payload), resp.Datatype)
}
2026-03-17 11:58:27 +01:00
func handlePlannerExecution(resp tools.NATSResponse) {
m := map[string]interface{}{}
p := planner.Planner{}
if err := json.Unmarshal(resp.Payload, &m); err != nil {
return
}
if err := json.Unmarshal(resp.Payload, &p); err != nil {
return
}
storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p)
}
2026-03-18 16:44:20 +01:00
func handleConsidersEvent(resp tools.NATSResponse) {
fmt.Println("CONSIDERS_EVENT", resp.Datatype)
switch resp.Datatype {
case tools.BOOKING, tools.PURCHASE_RESOURCE:
fmt.Println("updateExecutionState", resp.Datatype)
updateExecutionState(resp.Payload, resp.Datatype)
case tools.WORKFLOW_EXECUTION:
confirmExecutionDrafts(resp.Payload)
2026-03-17 11:58:27 +01:00
}
}
func handleRemoveResource(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.WORKFLOW:
wf := workflow.Workflow{}
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
return
}
notifyWorkflowWatchers(wf.GetID())
case tools.BOOKING:
var p removeResourcePayload
if err := json.Unmarshal(resp.Payload, &p); err != nil {
return
}
self, err := oclib.GetMySelf()
if err != nil || self == nil {
return
}
adminReq := &tools.APIRequest{Admin: true}
res, _, loadErr := booking.NewAccessor(adminReq).LoadOne(p.ID)
if loadErr != nil || res == nil {
return
}
existing := res.(*booking.Booking)
if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID {
fmt.Println("ListenNATS REMOVE_RESOURCE booking: auth mismatch, ignoring", p.ID)
return
}
booking.NewAccessor(adminReq).DeleteOne(p.ID)
go refreshSelfPlanner(self.PeerID, adminReq)
case tools.PURCHASE_RESOURCE:
var p removeResourcePayload
if err := json.Unmarshal(resp.Payload, &p); err != nil {
return
}
adminReq := &tools.APIRequest{Admin: true}
res, _, loadErr := purchase_resource.NewAccessor(adminReq).LoadOne(p.ID)
if loadErr != nil || res == nil {
return
}
existing := res.(*purchase_resource.PurchaseResource)
if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID {
fmt.Println("ListenNATS REMOVE_RESOURCE purchase: auth mismatch, ignoring", p.ID)
return
}
purchase_resource.NewAccessor(adminReq).DeleteOne(p.ID)
}
}
func handleCreateBooking(bk *booking.Booking, self *peer.Peer, adminReq *tools.APIRequest) {
// Upsert: if a booking with this ID already exists, verify auth and update.
if existing, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bk.GetID()); loadErr == nil && existing != nil {
prev := existing.(*booking.Booking)
if prev.SchedulerPeerID != bk.SchedulerPeerID || prev.ExecutionsID != bk.ExecutionsID {
fmt.Println("ListenNATS CREATE_RESOURCE booking upsert: auth mismatch, ignoring", bk.GetID())
return
}
if !prev.IsDrafted() && bk.IsDraft {
// Already confirmed, refuse downgrade.
return
}
// Expired check only on confirmation (IsDraft→false).
2026-03-19 09:25:46 +01:00
if !bk.IsDraft && !prev.ExpectedStartDate.IsZero() && prev.ExpectedStartDate.Before(time.Now().UTC()) {
2026-03-17 11:58:27 +01:00
fmt.Println("ListenNATS CREATE_RESOURCE booking: expired, deleting", bk.GetID())
booking.NewAccessor(adminReq).DeleteOne(bk.GetID())
return
}
if _, _, err := utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)); err != nil {
fmt.Println("ListenNATS CREATE_RESOURCE booking update failed:", err)
return
}
go refreshSelfPlanner(self.PeerID, adminReq)
if !bk.IsDraft {
go applyConsidersLocal(bk.GetID(), tools.BOOKING)
}
return
}
// New booking: standard create flow.
2026-03-19 09:25:46 +01:00
if !bk.ExpectedStartDate.IsZero() && bk.ExpectedStartDate.Before(time.Now().UTC()) {
2026-03-17 11:58:27 +01:00
fmt.Println("ListenNATS: booking start date is in the past, discarding")
return
}
plannerMu.RLock()
selfEntry := PlannerCache[self.PeerID]
plannerMu.RUnlock()
if selfEntry != nil && selfEntry.Planner != nil && !checkInstance(selfEntry.Planner, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) {
fmt.Println("ListenNATS: booking conflicts with local planner, discarding")
return
}
bk.IsDraft = true
stored, _, err := booking.NewAccessor(adminReq).StoreOne(bk)
if err != nil {
fmt.Println("ListenNATS: could not store booking:", err)
return
}
storedID := stored.GetID()
go refreshSelfPlanner(self.PeerID, adminReq)
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.BOOKING) })
}
func handleCreatePurchase(pr *purchase_resource.PurchaseResource, self *peer.Peer, adminReq *tools.APIRequest) {
if pr.DestPeerID != self.GetID() {
return
}
// Upsert: if a purchase with this ID already exists, verify auth and update.
if existing, _, loadErr := purchase_resource.NewAccessor(adminReq).LoadOne(pr.GetID()); loadErr == nil && existing != nil {
prev := existing.(*purchase_resource.PurchaseResource)
if prev.SchedulerPeerID != pr.SchedulerPeerID || prev.ExecutionsID != pr.ExecutionsID {
fmt.Println("ListenNATS CREATE_RESOURCE purchase upsert: auth mismatch, ignoring", pr.GetID())
return
}
if !prev.IsDrafted() && pr.IsDraft {
return
}
if _, _, err := utils.GenericRawUpdateOne(pr, pr.GetID(), purchase_resource.NewAccessor(adminReq)); err != nil {
fmt.Println("ListenNATS CREATE_RESOURCE purchase update failed:", err)
return
}
if !pr.IsDraft {
go applyConsidersLocal(pr.GetID(), tools.PURCHASE_RESOURCE)
}
return
}
// New purchase: standard create flow.
pr.IsDraft = true
stored, _, err := purchase_resource.NewAccessor(adminReq).StoreOne(pr)
if err != nil {
fmt.Println("ListenNATS: could not store purchase:", err)
return
}
storedID := stored.GetID()
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.PURCHASE_RESOURCE) })
}
func handleCreateResource(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.WORKFLOW:
wf := workflow.Workflow{}
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
return
}
broadcastPlanner(&wf)
notifyWorkflowWatchers(wf.GetID())
case tools.BOOKING:
var bk booking.Booking
if err := json.Unmarshal(resp.Payload, &bk); err != nil {
return
}
self, err := oclib.GetMySelf()
/*if err != nil || self == nil || bk.DestPeerID != self.GetID() {
return
}*/
adminReq := &tools.APIRequest{Admin: true}
_ = err
handleCreateBooking(&bk, self, adminReq)
case tools.PURCHASE_RESOURCE:
var pr purchase_resource.PurchaseResource
if err := json.Unmarshal(resp.Payload, &pr); err != nil {
return
}
self, err := oclib.GetMySelf()
if err != nil || self == nil {
return
}
adminReq := &tools.APIRequest{Admin: true}
handleCreatePurchase(&pr, self, adminReq)
}
}
// confirmResource sets IsDraft=false for a booking or purchase resource.
// For bookings it also advances State to SCHEDULED and refreshes the local planner.
func confirmResource(id string, dt tools.DataType) {
adminReq := &tools.APIRequest{Admin: true}
switch dt {
case tools.BOOKING:
res, _, err := booking.NewAccessor(adminReq).LoadOne(id)
if err != nil || res == nil {
fmt.Printf("confirmResource: could not load booking %s: %v\n", id, err)
return
}
bk := res.(*booking.Booking)
bk.IsDraft = false
bk.State = enum.SCHEDULED
if _, _, err := utils.GenericRawUpdateOne(bk, id, booking.NewAccessor(adminReq)); err != nil {
fmt.Printf("confirmResource: could not confirm booking %s: %v\n", id, err)
return
}
self, err := oclib.GetMySelf()
if err == nil && self != nil {
go refreshSelfPlanner(self.PeerID, adminReq)
}
case tools.PURCHASE_RESOURCE:
res, _, err := purchase_resource.NewAccessor(adminReq).LoadOne(id)
if err != nil || res == nil {
fmt.Printf("confirmResource: could not load purchase %s: %v\n", id, err)
return
}
pr := res.(*purchase_resource.PurchaseResource)
pr.IsDraft = false
if _, _, err := utils.GenericRawUpdateOne(pr, id, purchase_resource.NewAccessor(adminReq)); err != nil {
fmt.Printf("confirmResource: could not confirm purchase %s: %v\n", id, err)
}
}
}