354 lines
11 KiB
Go
354 lines
11 KiB
Go
package infrastructure
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
)
|
|
|
|
const plannerTTL = 24 * time.Hour
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Planner cache — protected by plannerMu
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// plannerEntry wraps a planner snapshot with refresh-ownership tracking.
|
|
// At most one check session may be the "refresh owner" of a given peer's
|
|
// planner at a time: it emits PB_PLANNER to request a fresh snapshot from
|
|
// oc-discovery and, on close (clean or forced), emits PB_CLOSE_PLANNER to
|
|
// release the stream. Any subsequent session that needs the same peer's
|
|
// planner will see Refreshing=true and skip the duplicate request.
|
|
type plannerEntry struct {
|
|
Planner *planner.Planner
|
|
Refreshing bool // true while a PB_PLANNER request is in flight
|
|
RefreshOwner string // session UUID that initiated the current refresh
|
|
}
|
|
|
|
var plannerMu sync.RWMutex
|
|
var PlannerCache = map[string]*plannerEntry{}
|
|
var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Subscriber registries — one keyed by peerID, one by workflowID
|
|
// ---------------------------------------------------------------------------
|
|
|
|
var subsMu sync.RWMutex
|
|
var plannerSubs = map[string][]chan string{} // peerID → channels (deliver peerID)
|
|
var workflowSubs = map[string][]chan struct{}{} // workflowID → notification channels
|
|
|
|
// subscribePlanners registers interest in planner changes for the given peer IDs.
|
|
// The returned channel receives the peerID string (non-blocking) each time any
|
|
// of those planners is updated. Call cancel to unregister.
|
|
func subscribePlanners(peerIDs []string) (<-chan string, func()) {
|
|
ch := make(chan string, 1)
|
|
subsMu.Lock()
|
|
for _, k := range peerIDs {
|
|
plannerSubs[k] = append(plannerSubs[k], ch)
|
|
}
|
|
subsMu.Unlock()
|
|
cancel := func() {
|
|
subsMu.Lock()
|
|
for _, k := range peerIDs {
|
|
subs := plannerSubs[k]
|
|
for i, s := range subs {
|
|
if s == ch {
|
|
plannerSubs[k] = append(subs[:i], subs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
subsMu.Unlock()
|
|
}
|
|
return ch, cancel
|
|
}
|
|
|
|
// SubscribePlannerUpdates registers interest in planner changes for the given
|
|
// peer IDs. The returned channel receives the peerID string (non-blocking) each
|
|
// time any of those planners is updated. Call cancel to unregister.
|
|
func SubscribePlannerUpdates(peerIDs []string) (<-chan string, func()) {
|
|
return subscribePlanners(peerIDs)
|
|
}
|
|
|
|
// SubscribeWorkflowUpdates registers interest in workflow modifications for the
|
|
// given workflow ID. The returned channel is signalled when the workflow changes
|
|
// (peer list may have grown or shrunk). Call cancel to unregister.
|
|
func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) {
|
|
ch, cancel := subscribe(&subsMu, workflowSubs, []string{wfID})
|
|
return ch, cancel
|
|
}
|
|
|
|
// subscribe is the generic helper used by the workflow registry.
|
|
func subscribe(mu *sync.RWMutex, registry map[string][]chan struct{}, keys []string) (<-chan struct{}, func()) {
|
|
ch := make(chan struct{}, 1)
|
|
mu.Lock()
|
|
for _, k := range keys {
|
|
registry[k] = append(registry[k], ch)
|
|
}
|
|
mu.Unlock()
|
|
cancel := func() {
|
|
mu.Lock()
|
|
for _, k := range keys {
|
|
subs := registry[k]
|
|
for i, s := range subs {
|
|
if s == ch {
|
|
registry[k] = append(subs[:i], subs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
return ch, cancel
|
|
}
|
|
|
|
func notifyPlannerWatchers(peerID string) {
|
|
subsMu.RLock()
|
|
subs := plannerSubs[peerID]
|
|
subsMu.RUnlock()
|
|
for _, ch := range subs {
|
|
select {
|
|
case ch <- peerID:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func notifyWorkflowWatchers(wfID string) {
|
|
notify(&subsMu, workflowSubs, wfID)
|
|
}
|
|
|
|
func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) {
|
|
mu.RLock()
|
|
subs := registry[key]
|
|
mu.RUnlock()
|
|
for _, ch := range subs {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Cache helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// storePlanner inserts or updates the planner snapshot for peerID.
|
|
// On first insertion it schedules an automatic eviction after plannerTTL.
|
|
// Existing refresh-ownership state (Refreshing / RefreshOwner) is preserved
|
|
// so that an in-flight request is not inadvertently reset.
|
|
// All subscribers interested in this peer are notified.
|
|
func storePlanner(peerID string, p *planner.Planner) {
|
|
plannerMu.Lock()
|
|
entry := PlannerCache[peerID]
|
|
isNew := entry == nil
|
|
if isNew {
|
|
entry = &plannerEntry{}
|
|
PlannerCache[peerID] = entry
|
|
plannerAddedAt[peerID] = time.Now()
|
|
go evictAfter(peerID, plannerTTL)
|
|
}
|
|
entry.Planner = p
|
|
plannerMu.Unlock()
|
|
notifyPlannerWatchers(peerID)
|
|
}
|
|
|
|
// evictAfter waits ttl from first insertion then deletes the cache entry and
|
|
// emits PB_CLOSE_PLANNER so oc-discovery stops streaming for this peer.
|
|
// This is the only path that actually removes an entry from PlannerCache;
|
|
// session close (ReleaseRefreshOwnership) only resets ownership state.
|
|
func evictAfter(peerID string, ttl time.Duration) {
|
|
time.Sleep(ttl)
|
|
plannerMu.Lock()
|
|
_, exists := PlannerCache[peerID]
|
|
if exists {
|
|
delete(PlannerCache, peerID)
|
|
delete(plannerAddedAt, peerID)
|
|
}
|
|
plannerMu.Unlock()
|
|
if exists {
|
|
EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER})
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Planner refresh / broadcast
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// RequestPlannerRefresh asks oc-discovery for a fresh planner snapshot for
|
|
// each peer in peerIDs. Only the first session to request a given peer becomes
|
|
// its "refresh owner": subsequent sessions see Refreshing=true and skip the
|
|
// duplicate PB_PLANNER emission. Returns the subset of peerIDs for which this
|
|
// session claimed ownership (needed to release on close).
|
|
func RequestPlannerRefresh(peerIDs []string, executionsID string) []string {
|
|
var owned []string
|
|
for _, peerID := range peerIDs {
|
|
plannerMu.Lock()
|
|
entry := PlannerCache[peerID]
|
|
if entry == nil {
|
|
entry = &plannerEntry{}
|
|
PlannerCache[peerID] = entry
|
|
plannerAddedAt[peerID] = time.Now()
|
|
go evictAfter(peerID, plannerTTL)
|
|
}
|
|
shouldRequest := !entry.Refreshing
|
|
if shouldRequest {
|
|
entry.Refreshing = true
|
|
entry.RefreshOwner = executionsID
|
|
}
|
|
plannerMu.Unlock()
|
|
if shouldRequest {
|
|
owned = append(owned, peerID)
|
|
if p, err := oclib.GetMySelf(); err == nil && p != nil && p.PeerID == peerID {
|
|
// Self peer: generate and cache the planner directly without
|
|
// going through NATS / oc-discovery.
|
|
go refreshSelfPlanner(peerID, &tools.APIRequest{Admin: true})
|
|
} else {
|
|
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
|
|
fmt.Println("PB_PLANNER", peerID)
|
|
EmitNATS(peerID, tools.PropalgationMessage{
|
|
Action: tools.PB_PLANNER,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
return owned
|
|
}
|
|
|
|
// ReleaseRefreshOwnership is called when a check session closes (clean or
|
|
// forced). For each peer this session owns, it resets the refresh state and
|
|
// emits PB_CLOSE_PLANNER so oc-discovery stops the planner stream.
|
|
// The planner data itself stays in the cache until TTL eviction.
|
|
func ReleaseRefreshOwnership(peerIDs []string, executionsID string) {
|
|
for _, peerID := range peerIDs {
|
|
plannerMu.Lock()
|
|
if entry := PlannerCache[peerID]; entry != nil && entry.RefreshOwner == executionsID {
|
|
entry.Refreshing = false
|
|
entry.RefreshOwner = ""
|
|
}
|
|
plannerMu.Unlock()
|
|
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
|
|
EmitNATS(peerID, tools.PropalgationMessage{
|
|
Action: tools.PB_CLOSE_PLANNER,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
|
|
// broadcastPlanner iterates the storage and compute peers of the given workflow
|
|
// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so
|
|
// downstream consumers (oc-discovery, other schedulers) refresh their state.
|
|
func broadcastPlanner(wf *workflow.Workflow) {
|
|
if wf.Graph == nil {
|
|
return
|
|
}
|
|
items := []graph.GraphItem{}
|
|
items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...)
|
|
items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...)
|
|
|
|
seen := []string{}
|
|
for _, item := range items {
|
|
i := item
|
|
_, res := i.GetResource()
|
|
if res == nil {
|
|
continue
|
|
}
|
|
creatorID := res.GetCreatorID()
|
|
if slices.Contains(seen, creatorID) {
|
|
continue
|
|
}
|
|
|
|
data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID)
|
|
p := data.ToPeer()
|
|
if p == nil {
|
|
continue
|
|
}
|
|
|
|
plannerMu.RLock()
|
|
cached := PlannerCache[p.PeerID]
|
|
plannerMu.RUnlock()
|
|
|
|
// Only request if no snapshot and no refresh already in flight.
|
|
if cached == nil || (cached.Planner == nil && !cached.Refreshing) {
|
|
payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
seen = append(seen, creatorID)
|
|
EmitNATS(p.PeerID, tools.PropalgationMessage{
|
|
Action: tools.PB_PLANNER,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Self-planner initialisation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// InitSelfPlanner bootstraps our own planner entry at startup.
|
|
// It waits (with 15-second retries) for our peer record to be present in the
|
|
// database before generating the first planner snapshot and broadcasting it
|
|
// on PB_PLANNER. This handles the race between oc-scheduler starting before
|
|
// oc-peer has fully registered our node.
|
|
func InitSelfPlanner() {
|
|
for {
|
|
self, err := oclib.GetMySelf()
|
|
if err != nil || self == nil {
|
|
fmt.Println("InitSelfPlanner: self peer not found yet, retrying in 15s...")
|
|
time.Sleep(15 * time.Second)
|
|
continue
|
|
}
|
|
refreshSelfPlanner(self.PeerID, &tools.APIRequest{Admin: true})
|
|
return
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Self-planner refresh
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// refreshSelfPlanner regenerates the local planner from the current state of
|
|
// the booking DB, stores it in PlannerCache under our own node UUID, and
|
|
// broadcasts it on PROPALGATION_EVENT / PB_PLANNER so all listeners (including
|
|
// oc-discovery) are kept in sync.
|
|
//
|
|
// It should be called whenever a booking for our own peer is created, whether
|
|
// by direct DB insertion (self-peer routing) or upon receiving a CREATE_RESOURCE
|
|
// BOOKING message from oc-discovery.
|
|
func refreshSelfPlanner(peerID string, request *tools.APIRequest) {
|
|
p, err := planner.GenerateShallow(request)
|
|
if err != nil {
|
|
fmt.Println("refreshSelfPlanner: could not generate planner:", err)
|
|
return
|
|
}
|
|
|
|
// Update the local cache and notify any waiting CheckStream goroutines.
|
|
storePlanner(peerID, p)
|
|
|
|
// Broadcast the updated planner so remote peers (and oc-discovery) can
|
|
// refresh their view of our availability.
|
|
type plannerWithPeer struct {
|
|
PeerID string `json:"peer_id"`
|
|
*planner.Planner
|
|
}
|
|
plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p})
|
|
if err != nil {
|
|
return
|
|
}
|
|
EmitNATS(peerID, tools.PropalgationMessage{
|
|
Action: tools.PB_PLANNER,
|
|
Payload: plannerPayload,
|
|
})
|
|
}
|