Workflow lifecycle events + resource instance duration tracking

- Add WorkflowLifecycleEvent + StepMetric to tools/workflow_lifecycle.go
- Add WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT NATS methods
- ResourceInstance.UpdateAverageDuration for AverageDurationS running average
- Support Steps recap in WORKFLOW_DONE_EVENT for catch-up by oc-scheduler/oc-catalog

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
mr
2026-03-20 10:30:30 +01:00
parent 6e28dce02c
commit a62fbc6c7a
10 changed files with 96 additions and 22 deletions

View File

@@ -73,7 +73,7 @@ func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelA
// check if
if end == nil {
// if no end... then Book like a savage
e := start.Add(time.Hour)
e := start.Add(5 * time.Minute)
end = &e
}
accessor := NewAccessor(nil)

View File

@@ -84,7 +84,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
end := bk.ExpectedEndDate
if end == nil {
e := bk.ExpectedStartDate.UTC().Add(time.Hour)
e := bk.ExpectedStartDate.UTC().Add(5 * time.Minute)
end = &e
}
@@ -126,7 +126,7 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
// If no capacity is known for this instance (never booked), it is fully available.
func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool {
if end == nil {
e := start.Add(time.Hour)
e := start.Add(5 * time.Minute)
end = &e
}

View File

@@ -31,6 +31,8 @@ type ResourceInstanceITF interface {
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups()
GetAverageDurationS() float64
UpdateAverageDuration(actualS float64)
}
type ResourcePartnerITF interface {

View File

@@ -115,10 +115,10 @@ func (abs *PricedResource) GetExplicitDurationInS() float64 {
}
if abs.BookingConfiguration.ExplicitBookingDurationS == 0 {
if abs.BookingConfiguration.UsageEnd == nil && abs.BookingConfiguration.UsageStart == nil {
return time.Duration(1 * time.Hour).Seconds()
return (5 * time.Minute).Seconds()
}
if abs.BookingConfiguration.UsageEnd == nil {
add := abs.BookingConfiguration.UsageStart.Add(time.Duration(1 * time.Hour))
add := abs.BookingConfiguration.UsageStart.Add(5 * time.Minute)
abs.BookingConfiguration.UsageEnd = &add
}
return abs.BookingConfiguration.UsageEnd.Sub(*abs.BookingConfiguration.UsageStart).Seconds()

View File

@@ -82,7 +82,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
if a.IsService {
return -1
}
return time.Duration(1 * time.Hour).Seconds()
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
}

View File

@@ -66,7 +66,7 @@ func (r *AbstractResource) CanDelete() bool {
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"`
}
func (abs *AbstractInstanciatedResource[T]) AddInstances(instance ResourceInstanceITF) {
@@ -109,6 +109,14 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data
if selectedBookingModeIndex != nil && abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)] != nil {
variations = append(variations, abs.AllowedBookingModes[booking.BookingMode(*selectedBookingModeIndex)])
}
// Seed the booking configuration with the instance's historical average duration
// so GetExplicitDurationInS() returns a realistic default out of the box.
var bc *BookingConfiguration
if inst != nil {
if avg := inst.GetAverageDurationS(); avg > 0 {
bc = &BookingConfiguration{ExplicitBookingDurationS: avg}
}
}
return &PricedResource{
Name: abs.Name,
Logo: abs.Logo,
@@ -120,6 +128,7 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(t tools.Data
SelectedPricing: profile,
Variations: variations,
CreatorID: abs.CreatorID,
BookingConfiguration: bc,
}, nil
}
@@ -196,6 +205,9 @@ type ResourceInstance[T ResourcePartnerITF] struct {
Outputs []models.Param `json:"outputs,omitempty" bson:"outputs,omitempty"`
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
AverageDurationS float64 `json:"average_duration_s,omitempty" bson:"average_duration_s,omitempty"`
AverageDurationSamples int `json:"average_duration_samples,omitempty" bson:"average_duration_samples,omitempty"`
}
// TODO should kicks all selection
@@ -285,6 +297,17 @@ func (ri *ResourceInstance[T]) ClearPeerGroups() {
}
}
func (ri *ResourceInstance[T]) GetAverageDurationS() float64 {
return ri.AverageDurationS
}
func (ri *ResourceInstance[T]) UpdateAverageDuration(actualS float64) {
buffered := actualS * 1.20
n := float64(ri.AverageDurationSamples)
ri.AverageDurationS = (ri.AverageDurationS*n + buffered) / (n + 1)
ri.AverageDurationSamples++
}
type ResourcePartnerShip[T pricing.PricingProfileITF] struct {
Namespace string `json:"namespace" bson:"namespace" default:"default-namespace"`
PeerGroups map[string][]string `json:"peer_groups,omitempty" bson:"peer_groups,omitempty"`

View File

@@ -606,7 +606,7 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
})
if err != nil {
@@ -644,7 +644,7 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
}
return start.Add(time.Duration(nearestStart) * time.Second), longestDuration, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err

View File

@@ -215,7 +215,14 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
if s := priced.GetLocationStart(); s != nil && s.After(time.Now()) {
start = *s
}
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
durationS := priced.GetExplicitDurationInS()
var endDate *time.Time
if durationS >= 0 {
e := start.Add(time.Duration(durationS) * time.Second)
endDate = &e
}
// durationS < 0 means the resource is a service (runs indefinitely):
// leave ExpectedEndDate nil so the booking is open-ended.
var m map[string]interface{}
b, _ := json.Marshal(priced)
json.Unmarshal(b, &m)
@@ -235,7 +242,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
WorkflowID: wfID,
ExecutionID: d.GetID(),
ExpectedStartDate: start,
ExpectedEndDate: &end,
ExpectedEndDate: endDate,
}
items = append(items, bookingItem)
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(

View File

@@ -30,6 +30,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
"workflow event", "argo kube event", "create resource", "remove resource",
"propalgation event", "search event", "confirm event",
"considers event", "admiralty config event", "minio config event",
"workflow started event", "workflow step done event", "workflow done event",
}
const (
@@ -52,6 +53,13 @@ const (
CONSIDERS_EVENT
ADMIRALTY_CONFIG_EVENT
MINIO_CONFIG_EVENT
// Workflow lifecycle events emitted by oc-monitord.
// oc-scheduler listens to STARTED and DONE to maintain WorkflowExecution state.
// oc-datacenter listens to STEP_DONE and DONE to close bookings and tear down infra.
WORKFLOW_STARTED_EVENT
WORKFLOW_STEP_DONE_EVENT
WORKFLOW_DONE_EVENT
)
func (n NATSMethod) String() string {
@@ -62,7 +70,8 @@ func (n NATSMethod) String() string {
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT,
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT,
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT} {
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}

View File

@@ -0,0 +1,33 @@
package tools
import "time"
// StepMetric carries the outcome of one Argo step node as observed by oc-monitord.
// Embedded in WorkflowLifecycleEvent.Steps for the WORKFLOW_DONE_EVENT recap.
type StepMetric struct {
BookingID string `json:"booking_id"`
State int `json:"state"`
RealStart *time.Time `json:"real_start,omitempty"`
RealEnd *time.Time `json:"real_end,omitempty"`
}
// WorkflowLifecycleEvent is the NATS payload emitted by oc-monitord on
// WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, and WORKFLOW_DONE_EVENT.
//
// - ExecutionID : WorkflowExecution UUID (used by oc-scheduler to update state)
// - ExecutionsID : run-group ID shared by all bookings of the same run
// - BookingID : non-empty only for WORKFLOW_STEP_DONE_EVENT
// - State : target state (enum index: SUCCESS=3, FAILURE=4, STARTED=2, …)
// - RealStart : actual start timestamp recorded by Argo (nil if unknown)
// - RealEnd : actual end timestamp recorded by Argo (nil for STARTED events)
// - Steps : non-nil only for WORKFLOW_DONE_EVENT — full recap of every step
// so oc-scheduler and oc-catalog can catch up if they missed STEP_DONE events
type WorkflowLifecycleEvent struct {
ExecutionID string `json:"execution_id"`
ExecutionsID string `json:"executions_id"`
BookingID string `json:"booking_id,omitempty"`
State int `json:"state"`
RealStart *time.Time `json:"real_start,omitempty"`
RealEnd *time.Time `json:"real_end,omitempty"`
Steps []StepMetric `json:"steps,omitempty"`
}