package infrastructure import ( "context" "encoding/json" "fmt" "sync" "time" "oc-datacenter/infrastructure/minio" "oc-datacenter/infrastructure/storage" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "go.mongodb.org/mongo-driver/bson/primitive" ) // processedBookings tracks booking IDs already handled this process lifetime. var processedBookings sync.Map // closingStates is the set of terminal booking states. var closingStates = map[enum.BookingStatus]bool{ enum.FAILURE: true, enum.SUCCESS: true, enum.FORGOTTEN: true, enum.CANCELLED: true, } // WatchBookings is a safety-net fallback for when oc-monitord fails to launch. // It detects bookings that are past expected_start_date by at least 1 minute and // are still in a non-terminal state. Instead of writing to the database directly, // it emits WORKFLOW_STEP_DONE_EVENT with State=FAILURE on NATS so that oc-scheduler // handles the state transition — keeping a single source of truth for booking state. // // Must be launched in a goroutine from main. func WatchBookings() { logger := oclib.GetLogger() logger.Info().Msg("BookingWatchdog: started") ticker := time.NewTicker(time.Minute) defer ticker.Stop() for range ticker.C { if err := scanStaleBookings(); err != nil { logger.Error().Msg("BookingWatchdog: " + err.Error()) } } } // scanStaleBookings queries all bookings whose ExpectedStartDate passed more than // 1 minute ago. Non-terminal ones get a WORKFLOW_STEP_DONE_EVENT FAILURE emitted // on NATS so oc-scheduler closes them. func scanStaleBookings() error { myself, err := oclib.GetMySelf() if err != nil { return fmt.Errorf("could not resolve local peer: %w", err) } peerID := myself.GetID() deadline := time.Now().UTC().Add(-time.Minute) res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "expected_start_date": {{ Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(deadline), }}, }, }, "", false) if res.Err != "" { return fmt.Errorf("stale booking search failed: %s", res.Err) } for _, dbo := range res.Data { b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } go emitWatchdogFailure(b) } return nil } // emitWatchdogFailure publishes a WORKFLOW_STEP_DONE_EVENT FAILURE for a stale // booking. oc-scheduler is the single authority for booking state transitions. func emitWatchdogFailure(b *bookingmodel.Booking) { logger := oclib.GetLogger() if _, done := processedBookings.Load(b.GetID()); done { return } if closingStates[b.State] { processedBookings.Store(b.GetID(), struct{}{}) return } now := time.Now().UTC() payload, err := json.Marshal(tools.WorkflowLifecycleEvent{ BookingID: b.GetID(), State: enum.FAILURE.EnumIndex(), RealEnd: &now, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Method: int(tools.WORKFLOW_STEP_DONE_EVENT), Payload: payload, }) logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID()) processedBookings.Store(b.GetID(), struct{}{}) } // ── Infra teardown helpers (called from nats.go on WORKFLOW_DONE_EVENT) ──────── // teardownAdmiraltyIfRemote triggers Admiralty TeardownAsTarget only when at // least one compute booking for the execution is on a remote peer. // Local executions do not involve Admiralty. func teardownAdmiraltyIfRemote(exec *workflow_execution.WorkflowExecution, selfPeerID string) { logger := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", selfPeerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}}, "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}}, }, }, "", false) if res.Err != "" || len(res.Data) == 0 { return } for _, dbo := range res.Data { b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } if b.DestPeerID != selfPeerID { logger.Info().Msgf("InfraTeardown: Admiralty teardown exec=%s (remote peer=%s)", exec.ExecutionsID, b.DestPeerID) NewAdmiraltySetter(exec.ExecutionsID).TeardownAsTarget(context.Background(), selfPeerID) return // one teardown per execution is enough } } } // teardownMinioForExecution tears down all Minio configuration for the execution: // - storage bookings where this peer is the compute target → TeardownAsTarget // - storage bookings where this peer is the Minio source → TeardownAsSource func teardownMinioForExecution(ctx context.Context, executionsID string, localPeerID string) { logger := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, }, }, "", false) if res.Err != "" || len(res.Data) == 0 { return } for _, dbo := range res.Data { b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } if b.DestPeerID == localPeerID { // This peer is the compute target: tear down K8s secret + configmap. logger.Info().Msgf("InfraTeardown: Minio target teardown exec=%s storage=%s", executionsID, b.ResourceID) event := minio.MinioDeleteEvent{ ExecutionsID: executionsID, MinioID: b.ResourceID, SourcePeerID: b.DestPeerID, DestPeerID: localPeerID, OriginID: "", } minio.NewMinioSetter(executionsID, b.ResourceID).TeardownAsTarget(ctx, event) } else { // This peer is the Minio source: revoke SA + remove execution bucket. logger.Info().Msgf("InfraTeardown: Minio source teardown exec=%s storage=%s", executionsID, b.ResourceID) event := minio.MinioDeleteEvent{ ExecutionsID: executionsID, MinioID: b.ResourceID, SourcePeerID: localPeerID, DestPeerID: b.DestPeerID, OriginID: "", } minio.NewMinioSetter(executionsID, b.ResourceID).TeardownAsSource(ctx, event) } } } // teardownPVCForExecution deletes all local PVCs provisioned for the execution. // It searches LIVE_STORAGE bookings and resolves the storage name via the live storage. func teardownPVCForExecution(ctx context.Context, executionsID string, localPeerID string) { logger := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil). Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, "resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}}, }, }, "", false) if res.Err != "" || len(res.Data) == 0 { return } for _, dbo := range res.Data { b, ok := dbo.(*bookingmodel.Booking) if !ok { continue } // Resolve storage name from live storage to compute the claim name. storageName := storage.ResolveStorageName(b.ResourceID, localPeerID) if storageName == "" { continue } logger.Info().Msgf("InfraTeardown: PVC teardown exec=%s storage=%s", executionsID, b.ResourceID) event := storage.PVCDeleteEvent{ ExecutionsID: executionsID, StorageID: b.ResourceID, StorageName: storageName, SourcePeerID: localPeerID, DestPeerID: b.DestPeerID, OriginID: "", } storage.NewPVCSetter(executionsID, b.ResourceID).TeardownAsSource(ctx, event) } }