2026-03-17 11:58:27 +01:00
|
|
|
package infrastructure
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/booking"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
|
|
|
)
|
|
|
|
|
|
2026-03-17 15:12:29 +01:00
|
|
|
func handleConfirm(resp tools.NATSResponse) {
|
|
|
|
|
confirmResource(string(resp.Payload), resp.Datatype)
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-17 11:58:27 +01:00
|
|
|
func handlePlannerExecution(resp tools.NATSResponse) {
|
|
|
|
|
m := map[string]interface{}{}
|
|
|
|
|
p := planner.Planner{}
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &m); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &p); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p)
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-18 16:44:20 +01:00
|
|
|
func handleConsidersEvent(resp tools.NATSResponse) {
|
|
|
|
|
fmt.Println("CONSIDERS_EVENT", resp.Datatype)
|
|
|
|
|
switch resp.Datatype {
|
|
|
|
|
case tools.BOOKING, tools.PURCHASE_RESOURCE:
|
|
|
|
|
fmt.Println("updateExecutionState", resp.Datatype)
|
|
|
|
|
updateExecutionState(resp.Payload, resp.Datatype)
|
|
|
|
|
case tools.WORKFLOW_EXECUTION:
|
|
|
|
|
confirmExecutionDrafts(resp.Payload)
|
2026-03-17 11:58:27 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleRemoveResource(resp tools.NATSResponse) {
|
|
|
|
|
switch resp.Datatype {
|
|
|
|
|
case tools.WORKFLOW:
|
|
|
|
|
wf := workflow.Workflow{}
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
notifyWorkflowWatchers(wf.GetID())
|
|
|
|
|
case tools.BOOKING:
|
|
|
|
|
var p removeResourcePayload
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &p); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
self, err := oclib.GetMySelf()
|
|
|
|
|
if err != nil || self == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
|
|
|
res, _, loadErr := booking.NewAccessor(adminReq).LoadOne(p.ID)
|
|
|
|
|
if loadErr != nil || res == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
existing := res.(*booking.Booking)
|
|
|
|
|
if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID {
|
|
|
|
|
fmt.Println("ListenNATS REMOVE_RESOURCE booking: auth mismatch, ignoring", p.ID)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
booking.NewAccessor(adminReq).DeleteOne(p.ID)
|
|
|
|
|
go refreshSelfPlanner(self.PeerID, adminReq)
|
|
|
|
|
case tools.PURCHASE_RESOURCE:
|
|
|
|
|
var p removeResourcePayload
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &p); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
|
|
|
res, _, loadErr := purchase_resource.NewAccessor(adminReq).LoadOne(p.ID)
|
|
|
|
|
if loadErr != nil || res == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
existing := res.(*purchase_resource.PurchaseResource)
|
|
|
|
|
if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID {
|
|
|
|
|
fmt.Println("ListenNATS REMOVE_RESOURCE purchase: auth mismatch, ignoring", p.ID)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
purchase_resource.NewAccessor(adminReq).DeleteOne(p.ID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleCreateBooking(bk *booking.Booking, self *peer.Peer, adminReq *tools.APIRequest) {
|
|
|
|
|
// Upsert: if a booking with this ID already exists, verify auth and update.
|
|
|
|
|
if existing, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bk.GetID()); loadErr == nil && existing != nil {
|
|
|
|
|
prev := existing.(*booking.Booking)
|
|
|
|
|
if prev.SchedulerPeerID != bk.SchedulerPeerID || prev.ExecutionsID != bk.ExecutionsID {
|
|
|
|
|
fmt.Println("ListenNATS CREATE_RESOURCE booking upsert: auth mismatch, ignoring", bk.GetID())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if !prev.IsDrafted() && bk.IsDraft {
|
|
|
|
|
// Already confirmed, refuse downgrade.
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Expired check only on confirmation (IsDraft→false).
|
2026-03-19 09:25:46 +01:00
|
|
|
if !bk.IsDraft && !prev.ExpectedStartDate.IsZero() && prev.ExpectedStartDate.Before(time.Now().UTC()) {
|
2026-03-17 11:58:27 +01:00
|
|
|
fmt.Println("ListenNATS CREATE_RESOURCE booking: expired, deleting", bk.GetID())
|
|
|
|
|
booking.NewAccessor(adminReq).DeleteOne(bk.GetID())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if _, _, err := utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)); err != nil {
|
|
|
|
|
fmt.Println("ListenNATS CREATE_RESOURCE booking update failed:", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
go refreshSelfPlanner(self.PeerID, adminReq)
|
|
|
|
|
if !bk.IsDraft {
|
|
|
|
|
go applyConsidersLocal(bk.GetID(), tools.BOOKING)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// New booking: standard create flow.
|
2026-03-19 09:25:46 +01:00
|
|
|
if !bk.ExpectedStartDate.IsZero() && bk.ExpectedStartDate.Before(time.Now().UTC()) {
|
2026-03-17 11:58:27 +01:00
|
|
|
fmt.Println("ListenNATS: booking start date is in the past, discarding")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
plannerMu.RLock()
|
|
|
|
|
selfEntry := PlannerCache[self.PeerID]
|
|
|
|
|
plannerMu.RUnlock()
|
|
|
|
|
if selfEntry != nil && selfEntry.Planner != nil && !checkInstance(selfEntry.Planner, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) {
|
|
|
|
|
fmt.Println("ListenNATS: booking conflicts with local planner, discarding")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bk.IsDraft = true
|
|
|
|
|
stored, _, err := booking.NewAccessor(adminReq).StoreOne(bk)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("ListenNATS: could not store booking:", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
storedID := stored.GetID()
|
|
|
|
|
go refreshSelfPlanner(self.PeerID, adminReq)
|
|
|
|
|
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.BOOKING) })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleCreatePurchase(pr *purchase_resource.PurchaseResource, self *peer.Peer, adminReq *tools.APIRequest) {
|
|
|
|
|
if pr.DestPeerID != self.GetID() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Upsert: if a purchase with this ID already exists, verify auth and update.
|
|
|
|
|
if existing, _, loadErr := purchase_resource.NewAccessor(adminReq).LoadOne(pr.GetID()); loadErr == nil && existing != nil {
|
|
|
|
|
prev := existing.(*purchase_resource.PurchaseResource)
|
|
|
|
|
if prev.SchedulerPeerID != pr.SchedulerPeerID || prev.ExecutionsID != pr.ExecutionsID {
|
|
|
|
|
fmt.Println("ListenNATS CREATE_RESOURCE purchase upsert: auth mismatch, ignoring", pr.GetID())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if !prev.IsDrafted() && pr.IsDraft {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if _, _, err := utils.GenericRawUpdateOne(pr, pr.GetID(), purchase_resource.NewAccessor(adminReq)); err != nil {
|
|
|
|
|
fmt.Println("ListenNATS CREATE_RESOURCE purchase update failed:", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if !pr.IsDraft {
|
|
|
|
|
go applyConsidersLocal(pr.GetID(), tools.PURCHASE_RESOURCE)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// New purchase: standard create flow.
|
|
|
|
|
pr.IsDraft = true
|
|
|
|
|
stored, _, err := purchase_resource.NewAccessor(adminReq).StoreOne(pr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("ListenNATS: could not store purchase:", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
storedID := stored.GetID()
|
|
|
|
|
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.PURCHASE_RESOURCE) })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleCreateResource(resp tools.NATSResponse) {
|
|
|
|
|
switch resp.Datatype {
|
|
|
|
|
case tools.WORKFLOW:
|
|
|
|
|
wf := workflow.Workflow{}
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
broadcastPlanner(&wf)
|
|
|
|
|
notifyWorkflowWatchers(wf.GetID())
|
|
|
|
|
case tools.BOOKING:
|
|
|
|
|
var bk booking.Booking
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &bk); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
self, err := oclib.GetMySelf()
|
|
|
|
|
/*if err != nil || self == nil || bk.DestPeerID != self.GetID() {
|
|
|
|
|
return
|
|
|
|
|
}*/
|
|
|
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
|
|
|
_ = err
|
|
|
|
|
handleCreateBooking(&bk, self, adminReq)
|
|
|
|
|
case tools.PURCHASE_RESOURCE:
|
|
|
|
|
var pr purchase_resource.PurchaseResource
|
|
|
|
|
if err := json.Unmarshal(resp.Payload, &pr); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
self, err := oclib.GetMySelf()
|
|
|
|
|
if err != nil || self == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
|
|
|
handleCreatePurchase(&pr, self, adminReq)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// confirmResource sets IsDraft=false for a booking or purchase resource.
|
|
|
|
|
// For bookings it also advances State to SCHEDULED and refreshes the local planner.
|
|
|
|
|
func confirmResource(id string, dt tools.DataType) {
|
|
|
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
|
|
|
switch dt {
|
|
|
|
|
case tools.BOOKING:
|
|
|
|
|
res, _, err := booking.NewAccessor(adminReq).LoadOne(id)
|
|
|
|
|
if err != nil || res == nil {
|
|
|
|
|
fmt.Printf("confirmResource: could not load booking %s: %v\n", id, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bk := res.(*booking.Booking)
|
|
|
|
|
bk.IsDraft = false
|
|
|
|
|
bk.State = enum.SCHEDULED
|
|
|
|
|
if _, _, err := utils.GenericRawUpdateOne(bk, id, booking.NewAccessor(adminReq)); err != nil {
|
|
|
|
|
fmt.Printf("confirmResource: could not confirm booking %s: %v\n", id, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
self, err := oclib.GetMySelf()
|
|
|
|
|
if err == nil && self != nil {
|
|
|
|
|
go refreshSelfPlanner(self.PeerID, adminReq)
|
|
|
|
|
}
|
|
|
|
|
case tools.PURCHASE_RESOURCE:
|
|
|
|
|
res, _, err := purchase_resource.NewAccessor(adminReq).LoadOne(id)
|
|
|
|
|
if err != nil || res == nil {
|
|
|
|
|
fmt.Printf("confirmResource: could not load purchase %s: %v\n", id, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
pr := res.(*purchase_resource.PurchaseResource)
|
|
|
|
|
pr.IsDraft = false
|
|
|
|
|
if _, _, err := utils.GenericRawUpdateOne(pr, id, purchase_resource.NewAccessor(adminReq)); err != nil {
|
|
|
|
|
fmt.Printf("confirmResource: could not confirm purchase %s: %v\n", id, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|