diff --git a/models/booking/booking.go b/models/booking/booking.go index 4318e01..c7fa050 100644 --- a/models/booking/booking.go +++ b/models/booking/booking.go @@ -73,7 +73,7 @@ func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelA // check if if end == nil { // if no end... then Book like a savage - e := start.Add(time.Hour) + e := start.Add(5 * time.Minute) end = &e } accessor := NewAccessor(nil) diff --git a/models/booking/planner/planner.go b/models/booking/planner/planner.go index 74e71d3..f02c449 100644 --- a/models/booking/planner/planner.go +++ b/models/booking/planner/planner.go @@ -84,7 +84,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { end := bk.ExpectedEndDate if end == nil { - e := bk.ExpectedStartDate.UTC().Add(time.Hour) + e := bk.ExpectedStartDate.UTC().Add(5 * time.Minute) end = &e } @@ -126,7 +126,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { // If no capacity is known for this instance (never booked), it is fully available. func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool { if end == nil { - e := start.Add(time.Hour) + e := start.Add(5 * time.Minute) end = &e } diff --git a/models/resources/interfaces.go b/models/resources/interfaces.go index 6e632fe..8a2f855 100755 --- a/models/resources/interfaces.go +++ b/models/resources/interfaces.go @@ -31,6 +31,8 @@ type ResourceInstanceITF interface { GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) ClearPeerGroups() + GetAverageDurationS() float64 + UpdateAverageDuration(actualS float64) } type ResourcePartnerITF interface { diff --git a/models/resources/priced_resource.go b/models/resources/priced_resource.go index 3be3bff..8d0d3de 100755 --- a/models/resources/priced_resource.go +++ b/models/resources/priced_resource.go @@ -115,10 +115,10 @@ func (abs *PricedResource) GetExplicitDurationInS() float64 { } if abs.BookingConfiguration.ExplicitBookingDurationS == 0 { if abs.BookingConfiguration.UsageEnd == nil && abs.BookingConfiguration.UsageStart == nil { - return time.Duration(1 * time.Hour).Seconds() + return (5 * time.Minute).Seconds() } if abs.BookingConfiguration.UsageEnd == nil { - add := abs.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour)) + add := abs.BookingConfiguration.UsageStart.Add(5 * time.Minute) abs.BookingConfiguration.UsageEnd = &add } return abs.BookingConfiguration.UsageEnd.Sub(*abs.BookingConfiguration.UsageStart).Seconds() diff --git a/models/resources/processing.go b/models/resources/processing.go index 3fe55ab..1d4d3bf 100755 --- a/models/resources/processing.go +++ b/models/resources/processing.go @@ -82,7 +82,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 { if a.IsService { return -1 } - return time.Duration(1 * time.Hour).Seconds() + return (5 * time.Minute).Seconds() } return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds() } diff --git a/models/resources/resource.go b/models/resources/resource.go index 7cb79b5..e1c255c 100755 --- a/models/resources/resource.go +++ b/models/resources/resource.go @@ -66,7 +66,7 @@ func (r *AbstractResource) CanDelete() bool { type AbstractInstanciatedResource[T ResourceInstanceITF] struct { AbstractResource // AbstractResource contains the basic fields of an object (id, name) - Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource + Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` } func (abs *AbstractInstanciatedResource[T]) AddInstances(instance ResourceInstanceITF) { @@ -109,17 +109,26 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data if selectedBookingModeIndex != nil && abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)] != nil { variations = append(variations, abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)]) } + // Seed the booking configuration with the instance's historical average duration + // so GetExplicitDurationInS() returns a realistic default out of the box. + var bc *BookingConfiguration + if inst != nil { + if avg := inst.GetAverageDurationS(); avg > 0 { + bc = &BookingConfiguration{ExplicitBookingDurationS: avg} + } + } return &PricedResource{ - Name: abs.Name, - Logo: abs.Logo, - ResourceID: abs.UUID, - InstanceID: inst.GetID(), - ResourceType: t, - Quantity: 1, - InstancesRefs: instances, - SelectedPricing: profile, - Variations: variations, - CreatorID: abs.CreatorID, + Name: abs.Name, + Logo: abs.Logo, + ResourceID: abs.UUID, + InstanceID: inst.GetID(), + ResourceType: t, + Quantity: 1, + InstancesRefs: instances, + SelectedPricing: profile, + Variations: variations, + CreatorID: abs.CreatorID, + BookingConfiguration: bc, }, nil } @@ -196,6 +205,9 @@ type ResourceInstance[T ResourcePartnerITF] struct { Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"` Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"` + + AverageDurationS float64 `json:"average_duration_s,omitempty" bson:"average_duration_s,omitempty"` + AverageDurationSamples int `json:"average_duration_samples,omitempty" bson:"average_duration_samples,omitempty"` } // TODO should kicks all selection @@ -285,6 +297,17 @@ func (ri *ResourceInstance[T]) ClearPeerGroups() { } } +func (ri *ResourceInstance[T]) GetAverageDurationS() float64 { + return ri.AverageDurationS +} + +func (ri *ResourceInstance[T]) UpdateAverageDuration(actualS float64) { + buffered := actualS * 1.20 + n := float64(ri.AverageDurationSamples) + ri.AverageDurationS = (ri.AverageDurationS*n + buffered) / (n + 1) + ri.AverageDurationSamples++ +} + type ResourcePartnerShip[T pricing.PricingProfileITF] struct { Namespace string `json:"namespace" bson:"namespace" default:"default-namespace"` PeerGroups map[string][]string `json:"peer_groups,omitempty" bson:"peer_groups,omitempty"` diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index e1d94a3..e49417b 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -606,7 +606,7 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte } return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil }, func(started time.Time, duration float64) (*time.Time, error) { - s := started.Add(time.Duration(duration)) + s := started.Add(time.Duration(duration) * time.Second) return &s, nil }) if err != nil { @@ -644,7 +644,7 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte } return start.Add(time.Duration(nearestStart) * time.Second), longestDuration, nil }, func(started time.Time, duration float64) (*time.Time, error) { - s := started.Add(time.Duration(duration)) + s := started.Add(time.Duration(duration) * time.Second) return &s, nil }); err != nil { return false, 0, priceds, nil, err diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 98f7ad1..75de43c 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -215,7 +215,14 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools. if s := priced.GetLocationStart(); s != nil && s.After(time.Now()) { start = *s } - end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) + durationS := priced.GetExplicitDurationInS() + var endDate *time.Time + if durationS >= 0 { + e := start.Add(time.Duration(durationS) * time.Second) + endDate = &e + } + // durationS < 0 means the resource is a service (runs indefinitely): + // leave ExpectedEndDate nil so the booking is open-ended. var m map[string]interface{} b, _ := json.Marshal(priced) json.Unmarshal(b, &m) @@ -235,7 +242,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools. WorkflowID: wfID, ExecutionID: d.GetID(), ExpectedStartDate: start, - ExpectedEndDate: &end, + ExpectedEndDate: endDate, } items = append(items, bookingItem) d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append( diff --git a/tools/nats_caller.go b/tools/nats_caller.go index 2c3c958..51041db 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -30,6 +30,7 @@ var meths = []string{"remove execution", "create execution", "planner execution" "workflow event", "argo kube event", "create resource", "remove resource", "propalgation event", "search event", "confirm event", "considers event", "admiralty config event", "minio config event", + "workflow started event", "workflow step done event", "workflow done event", } const ( @@ -52,6 +53,13 @@ const ( CONSIDERS_EVENT ADMIRALTY_CONFIG_EVENT MINIO_CONFIG_EVENT + + // Workflow lifecycle events emitted by oc-monitord. + // oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state. + // oc-datacenter listens to STEP_DONE and DONE to close bookings and tear down infra. + WORKFLOW_STARTED_EVENT + WORKFLOW_STEP_DONE_EVENT + WORKFLOW_DONE_EVENT ) func (n NATSMethod) String() string { @@ -62,7 +70,8 @@ func (n NATSMethod) String() string { func NameToMethod(name string) NATSMethod { for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT, CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT, - CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT} { + CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, + WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { return v } diff --git a/tools/workflow_lifecycle.go b/tools/workflow_lifecycle.go new file mode 100644 index 0000000..99a5559 --- /dev/null +++ b/tools/workflow_lifecycle.go @@ -0,0 +1,33 @@ +package tools + +import "time" + +// StepMetric carries the outcome of one Argo step node as observed by oc-monitord. +// Embedded in WorkflowLifecycleEvent.Steps for the WORKFLOW_DONE_EVENT recap. +type StepMetric struct { + BookingID string `json:"booking_id"` + State int `json:"state"` + RealStart *time.Time `json:"real_start,omitempty"` + RealEnd *time.Time `json:"real_end,omitempty"` +} + +// WorkflowLifecycleEvent is the NATS payload emitted by oc-monitord on +// WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, and WORKFLOW_DONE_EVENT. +// +// - ExecutionID : WorkflowExecution UUID (used by oc-scheduler to update state) +// - ExecutionsID : run-group ID shared by all bookings of the same run +// - BookingID : non-empty only for WORKFLOW_STEP_DONE_EVENT +// - State : target state (enum index: SUCCESS=3, FAILURE=4, STARTED=2, …) +// - RealStart : actual start timestamp recorded by Argo (nil if unknown) +// - RealEnd : actual end timestamp recorded by Argo (nil for STARTED events) +// - Steps : non-nil only for WORKFLOW_DONE_EVENT — full recap of every step +// so oc-scheduler and oc-catalog can catch up if they missed STEP_DONE events +type WorkflowLifecycleEvent struct { + ExecutionID string `json:"execution_id"` + ExecutionsID string `json:"executions_id"` + BookingID string `json:"booking_id,omitempty"` + State int `json:"state"` + RealStart *time.Time `json:"real_start,omitempty"` + RealEnd *time.Time `json:"real_end,omitempty"` + Steps []StepMetric `json:"steps,omitempty"` +}