package infrastructure import ( "encoding/json" "fmt" "slices" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/booking/planner" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/tools" ) const plannerTTL = 24 * time.Hour // --------------------------------------------------------------------------- // Planner cache — protected by plannerMu // --------------------------------------------------------------------------- // plannerEntry wraps a planner snapshot with refresh-ownership tracking. // At most one check session may be the "refresh owner" of a given peer's // planner at a time: it emits PB_PLANNER to request a fresh snapshot from // oc-discovery and, on close (clean or forced), emits PB_CLOSE_PLANNER to // release the stream. Any subsequent session that needs the same peer's // planner will see Refreshing=true and skip the duplicate request. type plannerEntry struct { Planner *planner.Planner Refreshing bool // true while a PB_PLANNER request is in flight RefreshOwner string // session UUID that initiated the current refresh } var plannerMu sync.RWMutex var PlannerCache = map[string]*plannerEntry{} var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp // --------------------------------------------------------------------------- // Subscriber registries — one keyed by peerID, one by workflowID // --------------------------------------------------------------------------- var subsMu sync.RWMutex var plannerSubs = map[string][]chan string{} // peerID → channels (deliver peerID) var workflowSubs = map[string][]chan struct{}{} // workflowID → notification channels // subscribePlanners registers interest in planner changes for the given peer IDs. // The returned channel receives the peerID string (non-blocking) each time any // of those planners is updated. Call cancel to unregister. func subscribePlanners(peerIDs []string) (<-chan string, func()) { ch := make(chan string, 1) subsMu.Lock() for _, k := range peerIDs { plannerSubs[k] = append(plannerSubs[k], ch) } subsMu.Unlock() cancel := func() { subsMu.Lock() for _, k := range peerIDs { subs := plannerSubs[k] for i, s := range subs { if s == ch { plannerSubs[k] = append(subs[:i], subs[i+1:]...) break } } } subsMu.Unlock() } return ch, cancel } // SubscribePlannerUpdates registers interest in planner changes for the given // peer IDs. The returned channel receives the peerID string (non-blocking) each // time any of those planners is updated. Call cancel to unregister. func SubscribePlannerUpdates(peerIDs []string) (<-chan string, func()) { return subscribePlanners(peerIDs) } // SubscribeWorkflowUpdates registers interest in workflow modifications for the // given workflow ID. The returned channel is signalled when the workflow changes // (peer list may have grown or shrunk). Call cancel to unregister. func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) { ch, cancel := subscribe(&subsMu, workflowSubs, []string{wfID}) return ch, cancel } // subscribe is the generic helper used by the workflow registry. func subscribe(mu *sync.RWMutex, registry map[string][]chan struct{}, keys []string) (<-chan struct{}, func()) { ch := make(chan struct{}, 1) mu.Lock() for _, k := range keys { registry[k] = append(registry[k], ch) } mu.Unlock() cancel := func() { mu.Lock() for _, k := range keys { subs := registry[k] for i, s := range subs { if s == ch { registry[k] = append(subs[:i], subs[i+1:]...) break } } } mu.Unlock() } return ch, cancel } func notifyPlannerWatchers(peerID string) { subsMu.RLock() subs := plannerSubs[peerID] subsMu.RUnlock() for _, ch := range subs { select { case ch <- peerID: default: } } } func notifyWorkflowWatchers(wfID string) { notify(&subsMu, workflowSubs, wfID) } func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) { mu.RLock() subs := registry[key] mu.RUnlock() for _, ch := range subs { select { case ch <- struct{}{}: default: } } } // --------------------------------------------------------------------------- // Cache helpers // --------------------------------------------------------------------------- // storePlanner inserts or updates the planner snapshot for peerID. // On first insertion it schedules an automatic eviction after plannerTTL. // Existing refresh-ownership state (Refreshing / RefreshOwner) is preserved // so that an in-flight request is not inadvertently reset. // All subscribers interested in this peer are notified. func storePlanner(peerID string, p *planner.Planner) { plannerMu.Lock() entry := PlannerCache[peerID] isNew := entry == nil if isNew { entry = &plannerEntry{} PlannerCache[peerID] = entry plannerAddedAt[peerID] = time.Now().UTC() go evictAfter(peerID, plannerTTL) } entry.Planner = p plannerMu.Unlock() notifyPlannerWatchers(peerID) } // evictAfter waits ttl from first insertion then deletes the cache entry and // emits PB_CLOSE_PLANNER so oc-discovery stops streaming for this peer. // This is the only path that actually removes an entry from PlannerCache; // session close (ReleaseRefreshOwnership) only resets ownership state. func evictAfter(peerID string, ttl time.Duration) { time.Sleep(ttl) plannerMu.Lock() _, exists := PlannerCache[peerID] if exists { delete(PlannerCache, peerID) delete(plannerAddedAt, peerID) } plannerMu.Unlock() if exists { EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER}) } } // --------------------------------------------------------------------------- // Planner refresh / broadcast // --------------------------------------------------------------------------- // RequestPlannerRefresh asks oc-discovery for a fresh planner snapshot for // each peer in peerIDs. Only the first session to request a given peer becomes // its "refresh owner": subsequent sessions see Refreshing=true and skip the // duplicate PB_PLANNER emission. Returns the subset of peerIDs for which this // session claimed ownership (needed to release on close). func RequestPlannerRefresh(peerIDs []string, executionsID string) []string { var owned []string for _, peerID := range peerIDs { plannerMu.Lock() entry := PlannerCache[peerID] if entry == nil { entry = &plannerEntry{} PlannerCache[peerID] = entry plannerAddedAt[peerID] = time.Now().UTC() go evictAfter(peerID, plannerTTL) } shouldRequest := !entry.Refreshing if shouldRequest { entry.Refreshing = true entry.RefreshOwner = executionsID } plannerMu.Unlock() if shouldRequest { owned = append(owned, peerID) if p, err := oclib.GetMySelf(); err == nil && p != nil && p.PeerID == peerID { // Self peer: generate and cache the planner directly without // going through NATS / oc-discovery. go refreshSelfPlanner(peerID, &tools.APIRequest{Admin: true}) } else { payload, _ := json.Marshal(map[string]any{"peer_id": peerID}) fmt.Println("PB_PLANNER", peerID) EmitNATS(peerID, tools.PropalgationMessage{ Action: tools.PB_PLANNER, Payload: payload, }) } } } return owned } // ReleaseRefreshOwnership is called when a check session closes (clean or // forced). For each peer this session owns, it resets the refresh state and // emits PB_CLOSE_PLANNER so oc-discovery stops the planner stream. // The planner data itself stays in the cache until TTL eviction. func ReleaseRefreshOwnership(peerIDs []string, executionsID string) { for _, peerID := range peerIDs { plannerMu.Lock() if entry := PlannerCache[peerID]; entry != nil && entry.RefreshOwner == executionsID { entry.Refreshing = false entry.RefreshOwner = "" } plannerMu.Unlock() payload, _ := json.Marshal(map[string]any{"peer_id": peerID}) EmitNATS(peerID, tools.PropalgationMessage{ Action: tools.PB_CLOSE_PLANNER, Payload: payload, }) } } // broadcastPlanner iterates the storage and compute peers of the given workflow // and, for each peer not yet in the cache, emits a PB_PLANNER propagation so // downstream consumers (oc-discovery, other schedulers) refresh their state. func broadcastPlanner(wf *workflow.Workflow) { if wf.Graph == nil { return } items := []graph.GraphItem{} items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...) items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...) seen := []string{} for _, item := range items { i := item _, res := i.GetResource() if res == nil { continue } creatorID := res.GetCreatorID() if slices.Contains(seen, creatorID) { continue } data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID) p := data.ToPeer() if p == nil { continue } plannerMu.RLock() cached := PlannerCache[p.PeerID] plannerMu.RUnlock() // Only request if no snapshot and no refresh already in flight. if cached == nil || (cached.Planner == nil && !cached.Refreshing) { payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID}) if err != nil { continue } seen = append(seen, creatorID) EmitNATS(p.PeerID, tools.PropalgationMessage{ Action: tools.PB_PLANNER, Payload: payload, }) } } } // --------------------------------------------------------------------------- // Self-planner initialisation // --------------------------------------------------------------------------- // InitSelfPlanner bootstraps our own planner entry at startup. // It waits (with 15-second retries) for our peer record to be present in the // database before generating the first planner snapshot and broadcasting it // on PB_PLANNER. This handles the race between oc-scheduler starting before // oc-peer has fully registered our node. func InitSelfPlanner() { for { self, err := oclib.GetMySelf() if err != nil || self == nil { fmt.Println("InitSelfPlanner: self peer not found yet, retrying in 15s...") time.Sleep(15 * time.Second) continue } refreshSelfPlanner(self.PeerID, &tools.APIRequest{Admin: true}) return } } // --------------------------------------------------------------------------- // Self-planner refresh // --------------------------------------------------------------------------- // refreshSelfPlanner regenerates the local planner from the current state of // the booking DB, stores it in PlannerCache under our own node UUID, and // broadcasts it on PROPALGATION_EVENT / PB_PLANNER so all listeners (including // oc-discovery) are kept in sync. // // It should be called whenever a booking for our own peer is created, whether // by direct DB insertion (self-peer routing) or upon receiving a CREATE_RESOURCE // BOOKING message from oc-discovery. func refreshSelfPlanner(peerID string, request *tools.APIRequest) { p, err := planner.GenerateShallow(request) if err != nil { fmt.Println("refreshSelfPlanner: could not generate planner:", err) return } // Update the local cache and notify any waiting CheckStream goroutines. storePlanner(peerID, p) // Broadcast the updated planner so remote peers (and oc-discovery) can // refresh their view of our availability. type plannerWithPeer struct { PeerID string `json:"peer_id"` *planner.Planner } plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p}) if err != nil { return } EmitNATS(peerID, tools.PropalgationMessage{ Action: tools.PB_PLANNER, Payload: plannerPayload, }) }