From e422efd2678ce560a93151ce693750c137ac217f Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 17 Mar 2026 15:12:29 +0100 Subject: [PATCH] Proper checkUp --- controllers/sheduler.go | 18 ++++++++++----- go.mod | 2 +- go.sum | 2 ++ infrastructure/nats.go | 40 ++++----------------------------- infrastructure/nats_handlers.go | 23 ++++--------------- infrastructure/scheduler.go | 22 ------------------ main.go | 1 - 7 files changed, 23 insertions(+), 85 deletions(-) diff --git a/controllers/sheduler.go b/controllers/sheduler.go index f185644..28185ac 100644 --- a/controllers/sheduler.go +++ b/controllers/sheduler.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "oc-scheduler/infrastructure" + "reflect" "strings" oclib "cloud.o-forge.io/core/oc-lib" @@ -160,15 +161,20 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { confirmed = true return } - infrastructure.CleanupSession(selfID, executionsID, selfID, req) - // Detect whether the user changed dates or instances. - datesChanged := !updated.Start.Equal(ws.Start) || + changed := updated.Cron != ws.Cron || + !updated.Start.Equal(ws.Start) || updated.DurationS != ws.DurationS || (updated.End == nil) != (ws.End == nil) || - (updated.End != nil && ws.End != nil && !updated.End.Equal(*ws.End)) + (updated.End != nil && ws.End != nil && !updated.End.Equal(*ws.End)) || + updated.BookingMode != ws.BookingMode || + !reflect.DeepEqual(updated.SelectedBillingStrategy, ws.SelectedBillingStrategy) || + !reflect.DeepEqual(updated.SelectedInstances, ws.SelectedInstances) || + !reflect.DeepEqual(updated.SelectedPartnerships, ws.SelectedPartnerships) || + !reflect.DeepEqual(updated.SelectedBuyings, ws.SelectedBuyings) || + !reflect.DeepEqual(updated.SelectedStrategies, ws.SelectedStrategies) + infrastructure.CleanupSession(selfID, executionsID, selfID, req) ws = updated - // Reschedule when dates changed or we haven't scheduled yet. - if err := pushCheck(datesChanged || !scheduled); err != nil { + if err := pushCheck(changed || !scheduled); err != nil { return } diff --git a/go.mod b/go.mod index 642e0e6..3a84c1b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-scheduler go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260317090440-1ac735cef10e + cloud.o-forge.io/core/oc-lib v0.0.0-20260317135927-72be3118b7af github.com/beego/beego/v2 v2.3.8 github.com/google/uuid v1.6.0 github.com/robfig/cron v1.2.0 diff --git a/go.sum b/go.sum index 6a00bee..bb24fce 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260317083202-65237f0d1f3f h1:X8ytAjBzEqnFL cloud.o-forge.io/core/oc-lib v0.0.0-20260317083202-65237f0d1f3f/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260317090440-1ac735cef10e h1:e/oYMPAqD27l3Rd473Xny/2Ut/LZnBYXAzfQArNOmrs= cloud.o-forge.io/core/oc-lib v0.0.0-20260317090440-1ac735cef10e/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260317135927-72be3118b7af h1:IySCYxJrKUpmRa2R3hXSaYxfWf/cm28NRpmwluEmzBI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260317135927-72be3118b7af/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/infrastructure/nats.go b/infrastructure/nats.go index 8ca421b..c60b87a 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -5,16 +5,12 @@ import ( "encoding/json" "fmt" "oc-scheduler/conf" - "sync" - "time" oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" - "github.com/nats-io/nats.go" ) // --------------------------------------------------------------------------- @@ -43,42 +39,14 @@ func EmitNATS(peerID string, message tools.PropalgationMessage) { func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ - tools.PLANNER_EXECUTION: handlePlannerExecution, + tools.PLANNER_EXECUTION: handlePlannerExecution, tools.PROPALGATION_EVENT: handlePropagationEvent, - tools.REMOVE_RESOURCE: handleRemoveResource, - tools.CREATE_RESOURCE: handleCreateResource, + tools.REMOVE_RESOURCE: handleRemoveResource, + tools.CREATE_RESOURCE: handleCreateResource, + tools.CONFIRM_EVENT: handleConfirm, }) } -// --------------------------------------------------------------------------- -// Confirm channels -// --------------------------------------------------------------------------- - -// ListenConfirm opens a direct NATS connection and subscribes to the hardcoded -// "confirm_booking" and "confirm_purchase" subjects. It reconnects automatically -// if the connection is lost. -func ListenConfirm() { - natsURL := config.GetConfig().NATSUrl - if natsURL == "" { - fmt.Println("ListenConfirm: NATS_SERVER not set, skipping confirm listeners") - return - } - for { - nc, err := nats.Connect(natsURL) - if err != nil { - fmt.Println("ListenConfirm: could not connect to NATS:", err) - time.Sleep(time.Minute) - continue - } - var wg sync.WaitGroup - wg.Add(2) - go listenConfirmChannel(nc, "confirm_booking", tools.BOOKING, &wg) - go listenConfirmChannel(nc, "confirm_purchase", tools.PURCHASE_RESOURCE, &wg) - wg.Wait() - nc.Close() - } -} - // --------------------------------------------------------------------------- // Draft timeout // --------------------------------------------------------------------------- diff --git a/infrastructure/nats_handlers.go b/infrastructure/nats_handlers.go index 0847216..0bab3b2 100644 --- a/infrastructure/nats_handlers.go +++ b/infrastructure/nats_handlers.go @@ -3,7 +3,6 @@ package infrastructure import ( "encoding/json" "fmt" - "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" @@ -15,9 +14,12 @@ import ( "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/tools" - "github.com/nats-io/nats.go" ) +func handleConfirm(resp tools.NATSResponse) { + confirmResource(string(resp.Payload), resp.Datatype) +} + func handlePlannerExecution(resp tools.NATSResponse) { m := map[string]interface{}{} p := planner.Planner{} @@ -255,20 +257,3 @@ func confirmResource(id string, dt tools.DataType) { } } } - -// listenConfirmChannel subscribes to a NATS subject and calls confirmResource -// for each message received. The message body is expected to be the plain -// resource ID (UTF-8 string). -func listenConfirmChannel(nc *nats.Conn, subject string, dt tools.DataType, wg *sync.WaitGroup) { - defer wg.Done() - ch := make(chan *nats.Msg, 64) - sub, err := nc.ChanSubscribe(subject, ch) - if err != nil { - fmt.Printf("listenConfirmChannel: could not subscribe to %s: %v\n", subject, err) - return - } - defer sub.Unsubscribe() - for msg := range ch { - confirmResource(string(msg.Data), dt) - } -} diff --git a/infrastructure/scheduler.go b/infrastructure/scheduler.go index 695dd81..5058d75 100644 --- a/infrastructure/scheduler.go +++ b/infrastructure/scheduler.go @@ -52,20 +52,6 @@ type WorkflowSchedule struct { Confirm bool `json:"confirm,omitempty"` } -// TODO PREEMPTION ! -/* -To schedule a preempted, omg. -pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres, celon un niveau de priorité. -Preemptible = 7, pour le moment il n'existera que 0 et 7. -Dans le cas d'une préemption l'exécution est immédiable et bloquera tout le monde tant qu'il n'a pas été exécuté. -Une ressource doit pouvoir être preemptible pour être exécutée de la sorte. -Se qui implique si on est sur une ressource par ressource que si un élement n'est pas préemptible, -alors il devra être effectué dés que possible - -Dans le cas dés que possible, la start date est immédiate MAIS ! -ne pourra se lancé que SI il n'existe pas d'exécution se lançant durant la période indicative. ( Ultra complexe ) -*/ - func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule { ws := &WorkflowSchedule{ UUID: uuid.New().String(), @@ -246,14 +232,6 @@ func propagateResource(obj utils.DBObject, destPeerID string, dt tools.DataType, errCh <- nil } -/* -BOOKING IMPLIED TIME, not of subscription but of execution -so is processing time execution time applied on computes -data can improve the processing time -time should implied a security time border (10sec) if not from the same executions -VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING -*/ - /* * getExecutions is a function that returns the executions of a workflow * it returns an array of workflow_execution.WorkflowExecution diff --git a/main.go b/main.go index 0080f5c..ac629d7 100644 --- a/main.go +++ b/main.go @@ -35,7 +35,6 @@ func main() { go infrastructure.ListenNATS() go infrastructure.InitSelfPlanner() - go infrastructure.ListenConfirm() go infrastructure.RecoverDraftExecutions() beego.Run() }