package infrastructure import ( "encoding/json" "fmt" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) // --------------------------------------------------------------------------- // NATS emission // --------------------------------------------------------------------------- func EmitNATS(peerID string, message tools.PropalgationMessage) { // PB_CLOSE_PLANNER: notify local watchers so streams re-evaluate. // Cache mutations (eviction or ownership reset) are the caller's // responsibility — see evictAfter and ReleaseRefreshOwnership. if message.Action == tools.PB_CLOSE_PLANNER { notifyPlannerWatchers(peerID) } b, _ := json.Marshal(message) tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: -1, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } // --------------------------------------------------------------------------- // NATS listeners // --------------------------------------------------------------------------- func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.PLANNER_EXECUTION: handlePlannerExecution, tools.CONSIDERS_EVENT: handleConsidersEvent, tools.REMOVE_RESOURCE: handleRemoveResource, tools.CREATE_RESOURCE: handleCreateResource, tools.CONFIRM_EVENT: handleConfirm, }) } // --------------------------------------------------------------------------- // Draft timeout // --------------------------------------------------------------------------- // draftTimeout deletes a booking or purchase resource if it is still a draft // after the 10-minute confirmation window has elapsed. func draftTimeout(id string, dt tools.DataType) { adminReq := &tools.APIRequest{Admin: true} var res utils.DBObject var loadErr error switch dt { case tools.BOOKING: res, _, loadErr = booking.NewAccessor(adminReq).LoadOne(id) case tools.PURCHASE_RESOURCE: res, _, loadErr = purchase_resource.NewAccessor(adminReq).LoadOne(id) default: return } if loadErr != nil || res == nil || !res.IsDrafted() { return } switch dt { case tools.BOOKING: booking.NewAccessor(adminReq).DeleteOne(id) case tools.PURCHASE_RESOURCE: purchase_resource.NewAccessor(adminReq).DeleteOne(id) } fmt.Printf("draftTimeout: %s %s deleted (still draft after 10 min)\n", dt.String(), id) }