76 Commits

Author SHA1 Message Date
mr
df7ecacb75 Add new oclib struct 2026-03-12 11:27:49 +01:00
mr
ce3425e9eb Scheduled Complex Workflow Event 2026-03-12 11:26:24 +01:00
mr
dbc41f0326 Build up Clean Schedulerd + Lib Kube 2026-02-25 13:19:46 +01:00
mr
142a81197b brokeback alpine 2026-02-09 15:49:34 +01:00
mr
41b92cebec better tagging 2026-02-09 09:48:08 +01:00
mr
016f5017f1 publish registry 2026-02-05 12:09:46 +01:00
mr
2a76f2b9fb new lib 2026-02-03 16:13:48 +01:00
mr
07ca18e347 Conf Oclib Package Lightest 2026-02-03 16:12:57 +01:00
mr
c3f769ccd5 oclib-debug 2026-02-03 09:43:13 +01:00
mr
26e5db4572 oclib debug 2026-02-03 08:48:31 +01:00
mr
937c811e8d TODo 2026-02-02 14:42:34 +01:00
mr
a9c7bb66ee Merge branch 'feature/event' 2026-02-02 14:36:32 +01:00
mr
87951c8a77 update oclib 2026-02-02 14:36:00 +01:00
mr
1fda2b1334 add cluster name 2026-01-20 11:14:22 +01:00
mr
01098842dd improvment oclib 2026-01-15 13:33:33 +01:00
mr
d267c88fb5 Event Scheduling/Listening 2026-01-14 15:16:19 +01:00
mr
8bbbd1dcfb update 2025-11-20 16:30:58 +01:00
mr
7717a02f7f gitignore 2025-11-13 09:48:44 +01:00
pb
568d5bac30 Merge branch 'main' of https://cloud.o-forge.io/core/oc-schedulerd 2025-08-12 16:36:29 +02:00
pb
666ffe10f5 Mise à jour des fichiers go.mod et go.sum 2025-08-12 16:23:01 +02:00
mr
817f3eb468 oclib 2025-06-24 17:00:10 +02:00
mr
937116e9c5 oc lib -> nats search 2025-06-24 09:09:26 +02:00
mr
bd58016d4b Merge branch 'main' of https://cloud.o-forge.io/core/oc-schedulerd into main 2025-06-16 09:18:00 +02:00
mr
0bfa8fdad0 test 2025-06-16 09:17:41 +02:00
pb
54610f2dcc Changed the type of the variable storing the scheduled executions 2025-05-27 15:48:21 +02:00
pb
defdcf4264 changed how we store upcoming executions from slice to map 2025-05-20 20:06:48 +02:00
pb
bcc024caef updated the value of ExecutionID in LocalMonitor constructor 2025-05-12 12:35:49 +02:00
pb
6fce8f3aac removed some fmt.Print 2025-04-30 17:56:20 +02:00
mr
b6dea94196 Merge branch 'main' of https://cloud.o-forge.io/core/oc-schedulerd into main 2025-04-28 14:04:21 +02:00
mr
fba84ac612 schedulerd 2025-04-28 14:01:28 +02:00
pb
1b21c142f1 added a step to connect the monitord container to the 'oc' network 2025-04-25 15:42:21 +02:00
pb
6c3a20999b misc 2025-04-25 11:33:48 +02:00
pb
90fa0b8edd Divided the execution between local and container and created an interface responsible for preparing and launching the execution 2025-04-25 11:14:54 +02:00
pb
b43cb6d758 cleaned README, need to add more documentation in it 2025-04-18 11:27:24 +02:00
pb
d94f9603e8 added logging to see if monitord is running well 2025-04-17 19:59:33 +02:00
pb
494ba2f361 updated the value of ExecutionID in LocalMonitor object 2025-04-17 19:58:59 +02:00
pb
0f6213cd14 Added some test and logging on the path to execute monitord 2025-04-17 19:58:19 +02:00
pb
012c8a83cb checking execution more often for dev purposes 2025-04-17 19:57:55 +02:00
pb
a59a48cc66 modify Docker related files to adapt to new architecture 2025-04-17 18:39:34 +02:00
pb
e106d7e243 Merge branch 'main' of https://cloud.o-forge.io/core/oc-schedulerd into feature/order 2025-04-17 18:38:19 +02:00
pb
139b249a7c modify Docker related files to adapt to new architecture 2025-04-17 18:37:48 +02:00
mr
7a7364fb45 adjustment 2025-03-28 08:48:12 +01:00
mr
b4d57a8f2f oc-monitord 2025-03-24 09:23:03 +01:00
mr
907880bfd6 dev launch mode 2025-03-06 09:33:03 +01:00
mr
09b8046073 loki traefik + neo oclib 2025-02-21 11:24:56 +01:00
mr
90af391a0d oclib update 2025-02-19 12:06:56 +01:00
mr
88b2edee2f oclib update + casual debug 2025-02-18 15:00:17 +01:00
mr
88610c3dba Merge branch 'feature/order' into main 2025-02-18 08:31:54 +01:00
mr
b753523c35 Docker oc-schedulerd 2025-02-17 16:55:01 +01:00
mr
7246dea2b2 Mode in CMD 2025-02-14 11:59:32 +01:00
mr
6621d14d74 neo oclib 2025-02-06 08:54:00 +01:00
mr
e4305bdbd1 Adapt to new inputs env struct + instance of a resource 2025-02-05 08:38:50 +01:00
mr
0de2f9842b working oc-schedulerd 2025-01-17 17:21:17 +01:00
plm
8ddb119899 Use regular conf oclib library instead of custom implem to leverage env variable injection fonctionality; Dockerfile refactor 2025-01-14 18:33:51 +01:00
plm
7d78920304 Removing binary from conf 2025-01-13 12:15:44 +01:00
plm
b4cef41db2 Unique entry point + oclib dependency update 2025-01-13 12:15:08 +01:00
mr
8eeba712e7 oc-lib 2024-11-21 11:02:53 +01:00
mr
9a676297f6 networks 2024-11-14 09:31:22 +01:00
mr
066f6b54e0 missing loki debug 2024-11-14 09:26:52 +01:00
mr
3e36ed0ecf oclib 2024-11-08 14:04:40 +01:00
mr
2589451b07 neo oclib 2024-11-07 13:35:16 +01:00
mr
c3d553068c goooo 2024-10-15 11:18:40 +02:00
mr
04a157ba64 moood 2024-10-02 14:38:51 +02:00
mr
6ab737c915 test 2024-09-27 10:10:00 +02:00
pb
47570d9423 change to fit update to oclib 2024-09-04 17:34:05 +02:00
pb
2bc6e4327e change to fit update to oclib 2024-09-04 17:29:00 +02:00
pb
a69ecc4ab5 debug 2024-08-28 14:03:48 +02:00
mr
7206de35a8 Scheduler deleted 2024-08-22 10:51:07 +02:00
mr
20b5955ba9 Oclib major new version 2024-08-21 14:20:13 +02:00
mr
826650487b debug multiple 2024-08-20 16:14:10 +02:00
mr
c5d15d32da debug 2024-08-20 15:24:46 +02:00
mr
825c18b6d6 simplify 2024-08-20 09:23:05 +02:00
mr
e5cfd6f4fb minimize code + schedulerd naming + docker 2024-08-19 11:42:26 +02:00
pb
c710469881 Added grafana to compose and conf for easier setup 2024-08-13 11:15:18 +02:00
pb
41f93a292c don't commit binary 2024-08-12 16:19:27 +02:00
pb
5b626dcb21 for future k8s exec 2024-08-12 16:11:13 +02:00
33 changed files with 839 additions and 1360 deletions

2
.gitignore vendored
View File

@@ -8,7 +8,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
env.env
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test

18
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,18 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${fileDirname}",
"env": {
"MONITOR_METHOD" : "local"
}
}
]
}

View File

@@ -1,18 +1,44 @@
FROM golang:alpine AS builder ARG KUBERNETES_HOST=${KUBERNETES_HOST:-"127.0.0.1"}
LABEL maintainer="IRT PFN" FROM golang:alpine AS deps
ENV DOCKER_ENVIRONMENT=true
ARG MONITORD_IMAGE
WORKDIR /app WORKDIR /app
COPY go.mod go.sum ./
RUN sed -i '/replace/d' go.mod
RUN go mod download -x
#----------------------------------------------------------------------------------------------
FROM golang:alpine AS builder
WORKDIR /app
COPY --from=deps /go/pkg /go/pkg
COPY --from=deps /app/go.mod /app/go.sum ./
COPY . . COPY . .
COPY conf/docker_scheduler.json /etc/oc/scheduler.json
RUN go build . RUN go build
#----------------------------------------------------------------------------------------------
FROM ${MONITORD_IMAGE:-oc-monitord}:latest AS monitord
FROM golang:alpine FROM golang:alpine
ENV KUBERNETES_SERVICE_HOST=$KUBERNETES_HOST
WORKDIR /app WORKDIR /app
COPY --from=builder /app/oc-scheduler . COPY docker_schedulerd.json /etc/oc/schedulerd.json
COPY conf/docker_scheduler.json /etc/oc/scheduler.json
ENTRYPOINT ["/app/oc-scheduler"] COPY --from=monitord /app/oc-monitord /usr/bin/oc-monitord
COPY --from=builder /app/oc-schedulerd /usr/bin/oc-schedulerd
COPY docker_schedulerd.json /etc/oc/schedulerd.json
# COPY argo_workflows .
EXPOSE 8080
ENTRYPOINT ["oc-schedulerd"]

36
Makefile Normal file
View File

@@ -0,0 +1,36 @@
.DEFAULT_GOAL := all
build: clean
go build .
build-monitord:
go build -o ../oc-monitord ../oc-monitord
run:
./oc-schedulerd
clean:
rm -rf oc-schedulerd
docker:
DOCKER_BUILDKIT=1 docker build -t oc-schedulerd --build-arg MONITORD_IMAGE=oc-monitord -f Dockerfile . --build-arg=HOST=$(HOST) --build-arg=KUBERNETES_HOST=$(KUBERNETES_HOST) --build-arg=KUBERNETES_SERVICE_PORT=$(KUBERNETES_SERVICE_PORT) --build-arg=KUBE_CA=$(KUBE_CA) --build-arg=KUBE_CERT=$(KUBE_CERT) --build-arg=KUBE_DATA=$(KUBE_DATA)
docker tag oc-schedulerd opencloudregistry/oc-schedulerd:latest
publish-kind:
kind load docker-image opencloudregistry/oc-schedulerd:latest --name $(CLUSTER_NAME) | true
publish-registry:
docker push opencloudregistry/oc-schedulerd:latest
docker-deploy:
docker compose up -d
run-docker: docker publish-kind publish-registry docker-deploy
all: docker publish-kind
ci: docker publish-registry
dev: build-monitord build run
.PHONY: build run clean docker publish-kind publish-registry

View File

@@ -1,18 +1,34 @@
# oc-scheduler # oc-scheduler
OC-Scheduler retrieves the content of submitted workflows and prepare them to be executed. oc-schedulerd is a daemon performing to actions at the same time :
- subscribing to the local NATS instance' custom channels for message commanding either the scheduling or the removing of an execution.
- polling oc-catalog for scheduled executions
## Parsing Depending on the environment it is running in, oc-schedulerd will either :
- execute the oc-monitord binary
- run an oc-monitord container
From a workflow's name we retrieve the xml graph associated and parse it in order to create the object representing each componant. ## Parameters
Each object is linked to another, represented by a links object with the two object IDs has attributes.
TODO : oc-schedulerd uses json files to load its configuration. The template for this configuration file is below
- [x] Retrieve the user input's for each component.
## Organising ```json
{
"LOKI_URL" : "http://[IP/URL]:3100",
"MONGO_URL":"mongodb://[IP/URL]:27017/",
"NATS_URL":"nats://[IP/URL]:4222",
"MONGO_DATABASE":"",
"MONITORD_PATH": "",
"KUBERNETES_SERVICE_HOST" : "[IP/URL]",
"MONITOR_MODE": "",
"KUBE_CA": "",
"KUBE_CERT": "",
"KUBE_DATA": ""
}
```
TODO : **monitor_mode** : should be either "local","container", ""
- [ ] create an argo file from the graph/worfklow
- [ ] Create a different entry for each component ## TODO
- [ ] execute each element in the right order
- [ ] Implement the discovery of current mode : local, local in container, as a container

View File

@@ -8,55 +8,46 @@ import (
) )
type Config struct { type Config struct {
OcCatalogUrl string MonitorPath string
MongoUrl string Logs string
DBName string KubeHost string
Logs string KubePort string
LokiUrl string KubeCA string
NatsUrl string KubeCert string
KubeData string
KubeNamespace string
KubeImage string
} }
var instance *Config var instance *Config
var once sync.Once var once sync.Once
const defaultConfigFile = "/etc/oc/scheduler.json" const defaultConfigFile = "/etc/oc/schedulerd.json"
const localConfigFile = "./conf/local_scheduler.json"
func init() {
func init(){
configFile := "" configFile := ""
var o *onion.Onion var o *onion.Onion
l3 := onion.NewEnvLayerPrefix("_", "OCSCHEDULER_") l3 := onion.NewEnvLayerPrefix("_", "OCSCHEDULERD_")
l2, err := onion.NewFileLayer(defaultConfigFile, nil) l2, err := onion.NewFileLayer(defaultConfigFile, nil)
if err == nil { if err == nil {
logs.Info("Config file found : " + defaultConfigFile) logs.Info("Config file found : " + defaultConfigFile)
configFile = defaultConfigFile configFile = defaultConfigFile
} }
l1, err := onion.NewFileLayer(localConfigFile, nil) if configFile == "" || l2 == nil {
if err == nil {
logs.Info("Local config file found " + localConfigFile + ", overriding default file")
configFile = localConfigFile
}
if configFile == "" {
logs.Info("No config file found, using env") logs.Info("No config file found, using env")
o = onion.New(l3) o = onion.New(l3)
} else if l1 == nil && l2 == nil { } else {
o = onion.New(l1, l2, l3)
} else if l1 == nil {
o = onion.New(l2, l3) o = onion.New(l2, l3)
} else if l2 == nil {
o = onion.New(l1, l3)
} }
GetConfig().MonitorPath = o.GetStringDefault("MONITORD_PATH", "../oc-monitord/oc-monitord")
GetConfig().OcCatalogUrl = o.GetStringDefault("oc-catalog", "https://localhost:49618") GetConfig().KubeHost = o.GetStringDefault("KUBE_HOST", "")
GetConfig().Logs = o.GetStringDefault("loglevel", "info") GetConfig().KubePort = o.GetStringDefault("KUBE_PORT", "6443")
GetConfig().LokiUrl = o.GetStringDefault("loki_url","http://127.0.0.1:3100") GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "")
GetConfig().NatsUrl = o.GetStringDefault("nats_url","http://127.0.0.1:4222") GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "")
GetConfig().MongoUrl = o.GetStringDefault("mongo_url","mongodb://127.0.0.1:27017") GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "")
GetConfig().DBName = o.GetStringDefault("database_name","DC_myDC") GetConfig().KubeNamespace = o.GetStringDefault("KUBE_NAMESPACE", "default")
GetConfig().KubeImage = o.GetStringDefault("KUBE_IMAGE", "oc-monitord")
} }
func GetConfig() *Config { func GetConfig() *Config {

View File

@@ -1,4 +0,0 @@
{
"oc-catalog" : "http://oc-catalog:49618/",
"loki_url" : "http://192.168.1.18:3100"
}

View File

@@ -0,0 +1,8 @@
datasources:
- name: Loki
type: loki
access: proxy
url: http://loki:3100
isDefault: true
jsonData:
httpMethod: POST

View File

@@ -1,5 +0,0 @@
{
"oc-catalog" : "http://localhost:49618/",
"logs" : "",
"mongo_url": "mongodb://127.0.0.1:27017"
}

View File

@@ -0,0 +1,143 @@
package daemons
import (
"context"
"encoding/base64"
"fmt"
"oc-schedulerd/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type ContainerMonitor struct {
Monitor LocalMonitor
KubeCA string
KubeCert string
KubeData string
KubeHost string
KubePort string
KubeNamespace string
KubeImage string
}
func NewContainerMonitor(UUID string, peerId string, duration int) Executor {
return &ContainerMonitor{
Monitor: LocalMonitor{
ExecutionID: UUID,
PeerID: peerId,
Duration: duration,
LokiUrl: oclib.GetConfig().LokiUrl,
MongoUrl: oclib.GetConfig().MongoUrl,
DBName: oclib.GetConfig().MongoDatabase,
},
KubeCA: conf.GetConfig().KubeCA,
KubeCert: conf.GetConfig().KubeCert,
KubeData: conf.GetConfig().KubeData,
KubeHost: conf.GetConfig().KubeHost,
KubePort: conf.GetConfig().KubePort,
KubeNamespace: conf.GetConfig().KubeNamespace,
KubeImage: conf.GetConfig().KubeImage,
}
}
func (cm *ContainerMonitor) PrepareMonitorExec() []string {
args := []string{
"-e", cm.Monitor.ExecutionID,
"-p", cm.Monitor.PeerID,
"-u", cm.Monitor.LokiUrl,
"-m", cm.Monitor.MongoUrl,
"-d", cm.Monitor.DBName,
"-M", "kubernetes",
"-H", cm.KubeHost,
"-P", cm.KubePort,
"-C", cm.KubeCert,
"-D", cm.KubeData,
"-c", cm.KubeCA,
}
if cm.Monitor.Duration > 0 {
args = append(args, "-t", fmt.Sprintf("%d", cm.Monitor.Duration))
}
return args
}
func (cm *ContainerMonitor) failExec(execID string, l zerolog.Logger, msg string) {
l.Error().Msg(msg)
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
}
func (cm *ContainerMonitor) LaunchMonitor(args []string, execID string, l zerolog.Logger) {
ca, err := base64.StdEncoding.DecodeString(cm.KubeCA)
if err != nil {
cm.failExec(execID, l, "Failed to decode KubeCA: "+err.Error())
return
}
cert, err := base64.StdEncoding.DecodeString(cm.KubeCert)
if err != nil {
cm.failExec(execID, l, "Failed to decode KubeCert: "+err.Error())
return
}
key, err := base64.StdEncoding.DecodeString(cm.KubeData)
if err != nil {
cm.failExec(execID, l, "Failed to decode KubeData: "+err.Error())
return
}
cfg := &rest.Config{
Host: "https://" + cm.KubeHost + ":" + cm.KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: ca,
CertData: cert,
KeyData: key,
},
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
cm.failExec(execID, l, "Failed to build Kubernetes client: "+err.Error())
return
}
backoffLimit := int32(0)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "oc-monitord-" + execID,
Namespace: cm.KubeNamespace,
},
Spec: batchv1.JobSpec{
BackoffLimit: &backoffLimit,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "oc-monitord",
Image: cm.KubeImage,
Args: args,
},
},
},
},
},
}
_, err = clientset.BatchV1().Jobs(cm.KubeNamespace).Create(context.Background(), job, metav1.CreateOptions{})
if err != nil {
cm.failExec(execID, l, "Failed to create Kubernetes Job: "+err.Error())
return
}
l.Info().Msg("Started Kubernetes Job oc-monitord-" + execID)
}

View File

@@ -1,67 +0,0 @@
package daemons
// type manifestValues struct{
// ARGO_FILE string
// LOKI_URL string
// CONTAINER_NAME string
// }
// manifest, err := em.CreateManifest(booking, argo_file_path)
// if err != nil {
// logger.Logger.Error().Msg("Could not create manifest " + err.Error())
// }
// // launch a pod that contains oc-monitor, give it the name of the workflow
// cmd := exec.Command("kubectl","apply","-f", "manifests/"+manifest)
// output , err := cmd.CombinedOutput()
// if err != nil {
// logger.Logger.Error().Msg("failed to create new pod for " + booking.Workflow + " :" + err.Error())
// return
// }
// func (*ExecutionManager) CreateManifest(booking models.Booking, argo_file string) (manifest_filepath string, err error) {
// var filled_template bytes.Buffer
// manifest_file, err := os.ReadFile("conf/monitor_pod_template.yml")
// if err != nil {
// logger.Logger.Error().Msg("Could not open the k8s template file for " + booking.Workflow)
// return "", err
// }
// container_name := getContainerName(argo_file)
// tmpl, err := template.New("manifest_template").Parse(string(manifest_file))
// if err != nil {
// logger.Logger.Error().Msg(err.Error())
// return "", err
// }
// manifest_data := manifestValues{
// ARGO_FILE: argo_file,
// LOKI_URL: conf.GetConfig().Logs,
// CONTAINER_NAME: container_name,
// }
// err = tmpl.Execute(&filled_template, manifest_data)
// if err != nil {
// logger.Logger.Error().Msg("Could not complete manifest template for " + booking.Workflow)
// return "", err }
// manifest_filepath = booking.Workflow + "_manifest.yaml"
// err = os.WriteFile("manifests/" + manifest_filepath, filled_template.Bytes(),0644)
// if err != nil {
// logger.Logger.Error().Msg("Could not write the YAML file for " + booking.Workflow + "'s manifest")
// return "", err
// }
// return manifest_filepath, nil
// }
// func getContainerName(argo_file string) string {
// regex := "([a-zA-Z]+-[a-zA-Z]+)"
// re := regexp.MustCompile(regex)
// container_name := re.FindString(argo_file)
// return container_name
// }

View File

@@ -1,47 +1,79 @@
package daemons package daemons
import ( import (
"oc-scheduler/conf" "fmt"
"oc-scheduler/logger" "oc-schedulerd/conf"
"os/exec" "os/exec"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"github.com/rs/zerolog"
) )
type LocalMonitor struct{ type LocalMonitor struct {
LokiURL string ExecutionID string
KubeURL string PeerID string
WorkflowName string Duration int
LokiUrl string
MongoUrl string
DBName string
} }
func (lm *LocalMonitor) LaunchLocalMonitor (){ func NewLocalMonitor(UUID string, peerId string, duration int) Executor {
if (lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == ""){ return &LocalMonitor{
logger.Logger.Error().Msg("Missing parameter in LocalMonitor") ExecutionID: UUID,
} PeerID: peerId,
Duration: duration,
// For dev purposes, in prod KubeURL must be a kube API's URL LokiUrl: oclib.GetConfig().LokiUrl,
if(lm.KubeURL == "localhost"){ MongoUrl: oclib.GetConfig().MongoUrl,
lm.execLocalKube() DBName: oclib.GetConfig().MongoDatabase,
} else{
lm.execRemoteKube()
} }
} }
func (lm *LocalMonitor) execLocalKube (){ // func (lm *LocalMonitor) LaunchLocalMonitor() {
// kube_url := "" // if lm.ExecutionID == "" {
cmd := exec.Command("../oc-monitor/oc-monitor", "-w",lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl,"-d", conf.GetConfig().DBName) // lm.Logger.Error().Msg("Missing parameter in LocalMonitor")
// cmd_ls := exec.Command("ls", "../oc-monitor") // }
err := cmd.Start()
// output, err := cmd_ls.CombinedOutput() // }
if err !=nil {
logger.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error()) func (lm *LocalMonitor) PrepareMonitorExec() []string {
args := []string{
"-e", lm.ExecutionID,
"-p", lm.PeerID,
"-u", lm.LokiUrl,
"-m", lm.MongoUrl,
"-d", lm.DBName,
} }
if lm.Duration > 0 {
args = append(args, "-t", fmt.Sprintf("%d", lm.Duration))
}
return args
} }
func (lm *LocalMonitor) LaunchMonitor(args []string, execID string, l zerolog.Logger) {
cmd := exec.Command(conf.GetConfig().MonitorPath, args...)
fmt.Printf("Command : %v\n", cmd)
func (lm *LocalMonitor) execRemoteKube (){ stdoutMonitord, err := cmd.StdoutPipe()
if err != nil {
l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
return
}
err = cmd.Start()
if err != nil {
l.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
return
}
logExecution(stdoutMonitord, l)
} }
func (lm *LocalMonitor) todo (){
}

View File

@@ -1,72 +1,93 @@
package daemons package daemons
import ( import (
"oc-scheduler/conf"
"oc-scheduler/logger"
"oc-scheduler/models"
"os"
"time" "time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
type ExecutionManager struct { var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}}
bookings *models.ScheduledBooking
executions []models.Booking
}
type ExecutionManager struct{}
// Loop every second on the Execution's list and move the Execution that must start to a new list
func (em *ExecutionManager) SetBookings(b *models.ScheduledBooking){
em.bookings = b
}
// Loop every second on the booking's list and move the booking that must start to a new list
// that will be looped over to start them // that will be looped over to start them
func (em *ExecutionManager) RetrieveNextExecutions(){ func (em *ExecutionManager) RetrieveNextExecutions() {
logger := oclib.GetLogger()
for {
Executions.Mu.Lock()
if len(Executions.Execs) > 0 {
executions := Executions.Execs
orderedExec := map[int]map[string]workflow_execution.WorkflowExecution{}
for execId, exec := range executions {
if orderedExec[exec.Priority] == nil {
orderedExec[exec.Priority] = map[string]workflow_execution.WorkflowExecution{}
}
orderedExec[exec.Priority][execId] = exec
}
for i := range []int{7, 6, 5, 4, 3, 2, 1, 0} { // priority in reversed
if orderedExec[i] == nil {
continue
}
for execId, exec := range orderedExec[i] {
if(em.bookings == nil){ if i == 0 && em.isAStartingExecutionBeforeEnd(&exec) { // BEST EFFORT exception
logger.Logger.Error().Msg("booking has not been set in the exection manager") continue
return }
} if exec.ExecDate.Before(time.Now().UTC()) {
logger.Info().Msg("Will execute " + execId + " soon")
for(true){ go em.executeExecution(&exec)
logger.Logger.Debug().Msg("New loop") delete(executions, execId)
em.bookings.Mu.Lock() }
bookings := em.bookings.Bookings
if (len(bookings) > 0){
for i := len( bookings) - 1 ; i >= 0 ; i--{
logger.Logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC() .String())
if (bookings[i].Start.Before(time.Now().UTC())){
logger.Logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon")
go em.executeBooking(bookings[i])
bookings = append(bookings[:i], bookings[i+1:]...)
em.bookings.Bookings = bookings
} }
} }
}
em.bookings.Mu.Unlock() }
Executions.Mu.Unlock()
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }
func (em *ExecutionManager) executeBooking(booking models.Booking){ func (em *ExecutionManager) isAStartingExecutionBeforeEnd(execution *workflow_execution.WorkflowExecution) bool {
access := workflow_execution.NewAccessor(nil)
l, _, err := access.Search(&dbs.Filters{
// start execution And: map[string][]dbs.Filter{
// create the yaml that describes the pod : filename, path/url to Loki "execution_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*execution.EndDate)}},
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
}, // TODO later should refine on each endpoint
exec_method := os.Getenv("MONITOR_METHOD") }, "", false)
if err != nil && len(l) == 0 {
if exec_method == "local"{ return false
logger.Logger.Debug().Msg("Executing oc-monitor localy")
monitor := LocalMonitor{LokiURL: conf.GetConfig().LokiUrl,KubeURL: "localhost",WorkflowName: booking.Workflow,}
monitor.LaunchLocalMonitor()
}else{
logger.Logger.Error().Msg("TODO : executing oc-monitor in a k8s")
} }
return true
} }
func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) {
// start execution
// create the yaml that describes the pod : filename, path/url to Loki
var executor Executor
// exec_method := os.Getenv("MONITOR_METHOD")
logger := oclib.GetLogger()
duration := 0
if execution.EndDate != nil {
duration = int(execution.EndDate.Sub(execution.ExecDate).Seconds())
}
executor = NewContainerMonitor(execution.UUID, execution.CreatorID, duration)
if executor == nil {
logger.Fatal().Msg("Could not create executor")
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execution.GetID())
return
}
args := executor.PrepareMonitorExec()
executor.LaunchMonitor(args, execution.GetID(), logger)
}

21
daemons/interface.go Normal file
View File

@@ -0,0 +1,21 @@
package daemons
import (
"bufio"
"io"
"github.com/rs/zerolog"
)
type Executor interface {
PrepareMonitorExec() []string
LaunchMonitor(args []string, execID string, l zerolog.Logger)
}
func logExecution(reader io.ReadCloser, l zerolog.Logger) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
output := scanner.Text()
l.Debug().Msg(output)
}
}

View File

@@ -3,152 +3,124 @@ package daemons
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"oc-scheduler/logger"
"oc-scheduler/models"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"github.com/nats-io/nats.go" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
type ScheduledExecution struct {
Execs map[string]workflow_execution.WorkflowExecution
Mu sync.Mutex
}
func (sb *ScheduledExecution) DeleteSchedules(resp tools.NATSResponse) {
var m map[string]string
json.Unmarshal(resp.Payload, &m)
Executions.Mu.Lock()
defer Executions.Mu.Unlock()
delete(sb.Execs, m["id"])
}
func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution.WorkflowExecution, logger zerolog.Logger) {
Executions.Mu.Lock()
defer Executions.Mu.Unlock()
for _, exec := range new_executions {
fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec))
if !sb.execIsSet(exec) {
sb.Execs[exec.UUID] = *exec
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.SCHEDULED.EnumIndex(),
}, exec.GetID())
}
}
}
func (sb *ScheduledExecution) execIsSet(exec *workflow_execution.WorkflowExecution) bool {
if _, ok := sb.Execs[exec.UUID]; ok {
return true
}
return false
}
// NATS daemon listens to subject " workflowsUpdate " // NATS daemon listens to subject " workflowsUpdate "
// workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }'
type ScheduleManager struct { type ScheduleManager struct {
Api_url string Logger zerolog.Logger
bookings *models.ScheduledBooking
ws models.HttpQuery
}
func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){
s.bookings = b
}
// Goroutine listening to a NATS server for updates
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (s *ScheduleManager) ListenForWorkflowSubmissions(){
if(s.bookings == nil){
logger.Logger.Error().Msg("booking has not been set in the schedule manager")
}
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
logger.Logger.Error().Msg("Could not connect to NATS")
return
}
defer nc.Close()
ch := make(chan *nats.Msg, 64)
subs , err := nc.ChanSubscribe("workflowsUpdate", ch)
if err != nil {
logger.Logger.Error().Msg("Error listening to NATS")
}
defer subs.Unsubscribe()
for msg := range(ch){
fmt.Println("Waiting...")
map_mess := retrieveMapFromSub(msg.Data)
s.bookings.Mu.Lock()
wf_exec := getWorkflowExecution(map_mess["workflow"])
s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate })
s.bookings.Mu.Unlock()
}
}
func getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution {
res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec_id)
if res.Code != 200 {
logger.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
return nil
}
wf_exec := res.ToWorkflowExecution()
return wf_exec
}
// At the moment very simplistic, but could be useful if we send bigger messages
func retrieveMapFromSub(message []byte) (result_map map[string]string) {
json.Unmarshal(message, &result_map)
return
} }
// Used at launch of the component to retrieve the next scheduled workflows // Used at launch of the component to retrieve the next scheduled workflows
// and then every X minutes in case some workflows were scheduled before launch // and then every X minutes in case some workflows were scheduled before launch
func (s *ScheduleManager) SchedulePolling (){ func (s *ScheduleManager) SchedulePolling() {
var sleep_time float64 = 1 var sleep_time float64 = 20
for(true){ for {
s.getNextScheduledWorkflows(3) s.GetNextScheduledWorkflows(tools.NATSResponse{})
s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Executions.Execs)))
logger.Logger.Info().Msg("Current list of schedules") time.Sleep(time.Second * time.Duration(sleep_time))
fmt.Println(s.bookings.Bookings)
time.Sleep(time.Minute * time.Duration(sleep_time))
} }
} }
func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) { func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) {
fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String())
f := dbs.Filters{ f := dbs.Filters{
And: map[string][]dbs.Filter{ And: map[string][]dbs.Filter{
"execution_date" : {{Operator : dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, "execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}},
"state": {{Operator: dbs.EQUAL.String(), Value: 1}}, "state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
}, },
} }
res := oclib.Search(&f,"",oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false)
if res.Code != 200 { if res.Code != 200 {
logger.Logger.Error().Msg("Error loading") s.Logger.Error().Msg("Error loading " + res.Err)
return nil, nil
}
for _, exec := range(res.Data){
lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec.GetID())
exec_obj := lib_data.ToWorkflowExecution()
exec_list = append(exec_list, *exec_obj)
}
return exec_list, nil
}
// TODO : refactor to implement oclib.Search
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
start := time.Now().UTC()
end := start.Add(time.Minute * time.Duration(minutes)).UTC()
fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String())
next_wf_exec, err := s.getWorfklowExecution(start,end)
if err != nil {
logger.Logger.Error().Msg("Could not retrieve next schedules")
return return
} }
for _, exec := range res.Data {
exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution))
s.bookings.Mu.Lock()
defer s.bookings.Mu.Unlock()
for _, exec := range(next_wf_exec){
exec_start := exec.ExecDate
exec_stop := exec.EndDate
s.bookings.AddSchedule(models.Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop})
} }
fmt.Println("Found "+fmt.Sprintf("%v", len(exec_list))+" workflows", res)
return
} }
func (s *ScheduleManager) ExecuteWorkflow(resp tools.NATSResponse) {
var event native_tools.WorkflowEventParams
json.Unmarshal(resp.Payload, &event)
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW), nil)
if d := access.LoadOne(fmt.Sprintf("%v", event.WorkflowResourceID)); d.Err == "" {
eventExec := &workflow_execution.WorkflowExecution{
WorkflowID: d.Data.GetID(),
ExecDate: time.Now(),
ExecutionsID: uuid.New().String(),
State: enum.SCHEDULED,
}
exec := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).StoreOne(eventExec.Serialize(eventExec))
if execc := exec.ToWorkflowExecution(); execc != nil {
Executions.AddSchedules([]*workflow_execution.WorkflowExecution{execc}, s.Logger)
}
}
}
func (s *ScheduleManager) GetNextScheduledWorkflows(_ tools.NATSResponse) {
start := time.Now().UTC()
fmt.Println(s.getExecution(
start.Add(time.Second*time.Duration(-1)).UTC(),
start.Add(time.Minute*time.Duration(1)).UTC(),
))
if next_wf_exec, err := s.getExecution(
start.Add(time.Second*time.Duration(-1)).UTC(),
start.Add(time.Minute*time.Duration(1)).UTC(),
); err != nil {
s.Logger.Error().Msg("Could not retrieve next schedules")
} else {
Executions.AddSchedules(next_wf_exec, s.Logger)
}
}

View File

@@ -1,23 +1,21 @@
version: '3.4' version: '3.4'
services: services:
nats: oc-schedulerd:
image: 'nats:latest' env_file:
container_name: nats - path: ./env.env
required: false
environment:
- MONGO_DATABASE=DC_myDC
- KUBE_CA=${KUBE_CA:-LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K}
- KUBE_TOKEN=${KUBE_TOKEN:-LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=}
image: 'oc-schedulerd:latest'
ports: ports:
- 4222:4222 - 9001:8080
command: container_name: oc-schedulerd
- "--debug"
networks: networks:
- scheduler - oc
loki:
image: 'grafana/loki'
container_name: loki
ports :
- "3100:3100"
networks:
- scheduler
networks: networks:
scheduler: oc:
external: true external: true

11
docker_schedulerd.json Normal file
View File

@@ -0,0 +1,11 @@
{
"LOKI_URL" : "http://loki:3100",
"MONGO_URL":"mongodb://mongo:27017/",
"NATS_URL":"nats://nats:4222",
"MONGO_DATABASE":"DC_myDC",
"MONITORD_PATH": "oc-monitord",
"MODE": "kubernetes",
"KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUL1NDWEMycjFTWGdza0FvTGJKSEtIem4zQXYva2t0ZElpSk42WlBsWVEKY3p0dXV5K3JBMHJ5VUlkZnIyK3VCRS9VN0NjSlhPL004QVdyODFwVklzVmdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVFHOVBQQ0g0c1lMbFkvQk5CdnN5CklEam1PK0l3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUtJeFc4NERQTW1URXVVN0Z3ek44SFB6ZHdldWh6U20KVzNYMU9tczFSQVNRQWlFQXI4UTJZSGtNQndSOThhcWtTa2JqU1dhejg0OEY2VkZLWjFacXpNbDFZaTg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJZWFxQUp2bHhmYzh3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOelF6TmpjME56ZzVNQjRYRFRJMU1EUXdNekV3TURZeU9Wb1hEVEkyTURRdwpNekV3TURZeU9Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJJelpGSlJUVHJmYXlNNFoKTjlRclN4MC9wbDdoZGdvWFM5bGEydmFFRkhlYVFaalRML2NZd1dMUnhoOWVOa01SRDZjTk4reWZkSXE2aWo1SQo5RTlENGdLalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFFzUkZXUlNweDV0RGZnZDh1UTdweUw0ZERMVEFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQStXZTlBVXJRUm5pWjVCUERELzJwWjA3TzFQWWFIc01ycTZZcVB4VlV5cGdDSUhrRE8rcVlMYUhkUEhXZgpWUGszNXJmejM0Qk4xN2VyaEVxRjF0U0c1MWFqCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUNDF1NXIzM0JyenZ3ZXZaWHM2TEg3T1k4NGhOOGRrODdnTlhaUndBdWkKdXJBaU45TFdYcmYxeFoyaXp5d0FiVGk1ZVc2Q1hIMjhDdEVSWUlrcjNoTXdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBMRVJWa1VxY2ViUTM0SGZMa082CmNpK0hReTB3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUpLWGZLdXBzdklONEtQVW50c1lPNXhiaGhSQmhSYlIKN3JyeWs2VHpZMU5JQWlBVktKWis3UUxzeGFyQktORnI3eTVYYlNGanI3Y1gyQmhOYy9wdnFLcWtFUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVJd01wVjdzMHc2S0VTQ2FBWDhvSVZPUHloa2U0Q3duNWZQZnhOaUYyM3JvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFak5rVWxGTk90OXJJemhrMzFDdExIVCttWHVGMkNoZEwyVnJhOW9RVWQ1cEJtTk12OXhqQgpZdEhHSDE0MlF4RVBwdzAzN0o5MGlycUtQa2owVDBQaUFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
}

View File

@@ -3,7 +3,7 @@ package "main" {
class Graph { class Graph {
[]DataModel Datas []DataModel Datas
[]ComputingModel Computings []ComputingModel Computings
[]DatacenterModel Datacenters []ComputeModel Computes
[]StorageModel Storages []StorageModel Storages
map[string, Link] Links map[string, Link] Links
HttpQuery ws HttpQuery ws
@@ -13,7 +13,7 @@ package "main" {
GetWorkflowComponents(workflow string) GetWorkflowComponents(workflow string)
GetLinks(workflow string) GetLinks(workflow string)
AddDataModel(id string, user_input gjson.Result, wf_id string) error AddDataModel(id string, user_input gjson.Result, wf_id string) error
AddDatacenterModel(id string, user_input gjson.Result, wf_id string) error AddComputeModel(id string, user_input gjson.Result, wf_id string) error
AddComputingModel(id string, user_input gjson.Result, wf_id string) error AddComputingModel(id string, user_input gjson.Result, wf_id string) error
AddStorageModel(id string, user_input gjson.Result, wf_id string) error AddStorageModel(id string, user_input gjson.Result, wf_id string) error
ExportToArgo(id string) error ExportToArgo(id string) error

View File

@@ -4,14 +4,14 @@ package "main" {
[]Link Links []Link Links
[]DataModel Datas []DataModel Datas
[]ComputingModel Computings []ComputingModel Computings
[]DatacenterModel Datacenters []ComputeModel Computes
[]StorageModel Storages []StorageModel Storages
HttpQuery ws HttpQuery ws
GetGraphList(apiurl string) (map[string]string, error) GetGraphList(apiurl string) (map[string]string, error)
LoadFrom(workspace string) error LoadFrom(workspace string) error
AddDataModel(id string) error AddDataModel(id string) error
AddDatacenterModel(id string) error AddComputeModel(id string) error
AddComputingModel(id string) error AddComputingModel(id string) error
AddStorageModel(id string) error AddStorageModel(id string) error
ExportToArgo(id string) error ExportToArgo(id string) error

View File

@@ -1,14 +0,0 @@
package main
import (
"oc-scheduler/daemons"
"oc-scheduler/models"
"testing"
)
func TestCreateManifest(t *testing.T){
em := daemons.ExecutionManager{}
em.CreateManifest(models.Booking{},"fessity-chlics_23_07_2024_154326")
}

126
go.mod
View File

@@ -1,100 +1,84 @@
module oc-scheduler module oc-schedulerd
go 1.22.0 go 1.25.0
toolchain go1.22.5
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260312083310-f5e199132416
github.com/beego/beego v1.12.12 github.com/beego/beego v1.12.12
github.com/beego/beego/v2 v2.2.2 github.com/google/uuid v1.6.0
github.com/goraz/onion v0.1.3 github.com/goraz/onion v0.1.3
github.com/nats-io/nats.go v1.9.1 github.com/rs/zerolog v1.34.0
github.com/nwtgck/go-fakelish v0.1.3 go.mongodb.org/mongo-driver v1.17.4
github.com/rs/zerolog v1.33.0 k8s.io/api v0.35.1
github.com/tidwall/gjson v1.17.1 k8s.io/apimachinery v0.35.1
gopkg.in/yaml.v3 v3.0.1 k8s.io/client-go v0.35.1
k8s.io/client-go v0.30.3
) )
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 // indirect github.com/beego/beego/v2 v2.3.8 // indirect
github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect
github.com/antihax/optional v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.36.29 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/biter777/countries v1.7.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.5 // indirect github.com/emicklei/go-restful/v3 v3.12.2 // indirect
github.com/go-logr/logr v1.4.1 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.0 // indirect github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect github.com/golang/snappy v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gnostic-models v0.7.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect
github.com/libp2p/go-libp2p/core v0.43.0-rc2 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/montanaflynn/stats v0.7.1 // indirect github.com/montanaflynn/stats v0.7.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt v0.3.2 // indirect github.com/nats-io/nats.go v1.44.0 // indirect
github.com/nats-io/nkeys v0.1.3 // indirect github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.23.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.17.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect github.com/x448/float16 v0.8.4 // indirect
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/vk496/cron v1.2.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.mongodb.org/mongo-driver v1.16.1 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.26.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/net v0.28.0 // indirect golang.org/x/crypto v0.44.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.8.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.24.0 // indirect golang.org/x/sync v0.18.0 // indirect
golang.org/x/term v0.23.0 // indirect golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.17.0 // indirect golang.org/x/term v0.37.0 // indirect
golang.org/x/time v0.3.0 // indirect golang.org/x/text v0.31.0 // indirect
google.golang.org/appengine v1.6.7 // indirect golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.30.3 // indirect k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/apimachinery v0.30.3 // indirect k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/klog/v2 v2.120.1 // indirect k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.6.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
) )

763
go.sum

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -1,59 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Note: the example only works with the code within the same release/branch.
package k8s
import (
"flag"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
//
// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"
//
// Or uncomment to load specific auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth/azure"
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// _ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)
func NewK8SClient() *kubernetes.Clientset {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
return clientset
}

View File

@@ -1,13 +0,0 @@
package logger
import (
"cloud.o-forge.io/core/oc-lib/logs"
"github.com/rs/zerolog"
)
var Logger zerolog.Logger
func init() {
logs.SetAppName("oc-scheduler")
Logger = logs.CreateLogger("","")
}

75
main.go
View File

@@ -1,56 +1,53 @@
package main package main
import ( import (
"fmt" "oc-schedulerd/conf"
"oc-schedulerd/daemons"
conf "oc-scheduler/conf" "os"
"oc-scheduler/models"
"oc-scheduler/daemons"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
) )
// var log zerolog.Logger var appname = "oc-schedulerd"
func main() { func main() {
var bookings models.ScheduledBooking oclib.InitDaemon(appname)
l := oclib.GetLogger()
oclib.SetConfig(conf.GetConfig().MongoUrl,"DC_myDC") o := oclib.GetConfLoader(appname)
oclib.Init("oc-scheduler")
app_conf := conf.GetConfig() conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", os.Getenv("KUBERNETES_SERVICE_HOST"))
apiurl := app_conf.OcCatalogUrl conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443")
sch_mngr := daemons.ScheduleManager{Api_url: apiurl} conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", os.Getenv("KUBE_CA"))
sch_mngr.SetBookings(&bookings) conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", os.Getenv("KUBE_CERT"))
conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", os.Getenv("KUBE_DATA"))
// Test if oc-monitor binary is reachable
// For local executions
if _, err := os.Stat("../oc-monitord/oc-monitord"); err == nil {
conf.GetConfig().MonitorPath = "../oc-monitord/oc-monitord"
}
// For container executions
if _, err := os.Stat("/usr/bin/oc-monitord"); conf.GetConfig().MonitorPath == "" && err == nil {
conf.GetConfig().MonitorPath = "/usr/bin/oc-monitord"
}
if conf.GetConfig().MonitorPath == "" {
l.Fatal().Msg("Could not find oc-monitord binary")
}
l.Info().Msg("oc-monitord binary at " + conf.GetConfig().MonitorPath)
sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()}
exe_mngr := daemons.ExecutionManager{} exe_mngr := daemons.ExecutionManager{}
exe_mngr.SetBookings(&bookings)
go sch_mngr.ListenForWorkflowSubmissions() go tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
tools.CREATE_EXECTUTION: sch_mngr.GetNextScheduledWorkflows, // TODO: unused for now...
tools.WORKFLOW_EVENT: sch_mngr.ExecuteWorkflow,
tools.REMOVE_EXECUTION: daemons.Executions.DeleteSchedules, // TODO: unused for now...
})
go sch_mngr.SchedulePolling() go sch_mngr.SchedulePolling()
exe_mngr.RetrieveNextExecutions() exe_mngr.RetrieveNextExecutions()
// method in Schedule manager that checks the first Schedule object for its start date and exe
// var g Graph
// list, err := g.GetGraphList(apiurl)
// if err != nil {
// log.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + apiurl)
// }
// println("Available workspaces :")
// for workspace, _ := range list {
// println(workspace)
// }
// g.LoadFrom(list["test-alpr"])
// g.ExportToArgo("test-alpr")
fmt.Print("stop")
} }

View File

@@ -4,12 +4,12 @@ metadata:
name: test-monitor name: test-monitor
spec: spec:
containers: containers:
- name: "oc-monitor-quity-anetran" - name: "oc-workflow-prous-skintris"
image: docker.io/library/oc-monitor # Currently uses the local contenaird image: docker.io/library/oc-monitor # Currently uses the local contenaird
imagePullPolicy: IfNotPresent # This should be removed once a registry has been set up imagePullPolicy: IfNotPresent # This should be removed once a registry has been set up
env: env:
- name: "OCMONITOR_ARGOFILE" - name: "OCMONITOR_ARGOFILE"
value: "quity-anetran_29_07_2024_144136.yml" value: "prous-skintris_29_07_2024_164008.yml"
- name: "OCMONITOR_LOKIURL" - name: "OCMONITOR_LOKIURL"
value: "info" # !!!! In dev this must be replaced with the address of one of your interface (wifi, ethernet..) value: "info" # !!!! In dev this must be replaced with the address of one of your interface (wifi, ethernet..)
restartPolicy: OnFailure restartPolicy: OnFailure

View File

@@ -1,53 +0,0 @@
package models
import (
"io"
"net/http"
"net/http/cookiejar"
"net/url"
)
type HttpQuery struct {
baseurl string
jar http.CookieJar
Cookies map[string]string
}
func (h *HttpQuery) Init(url string) {
h.baseurl = url
h.jar, _ = cookiejar.New(nil)
h.Cookies = make(map[string]string)
}
func (h *HttpQuery) Get(url string) ([]byte, error) {
client := &http.Client{Jar: h.jar}
resp, err := client.Get(h.baseurl + url)
if err != nil {
return nil, err
}
// store received cookies
for _, cookie := range h.jar.Cookies(resp.Request.URL) {
h.Cookies[cookie.Name] = cookie.Value
}
if err != nil {
return nil, err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
func (h *HttpQuery) Post(url string, data url.Values) (*http.Response, error) {
client := &http.Client{Jar: h.jar}
resp, err := client.PostForm(h.baseurl+url, data)
if err != nil {
return nil, err
}
// store received cookies
for _, cookie := range h.jar.Cookies(resp.Request.URL) {
h.Cookies[cookie.Name] = cookie.Value
}
return resp, err
}

View File

@@ -1,71 +0,0 @@
package models
import (
"fmt"
"oc-scheduler/logger"
"sync"
"time"
)
// Is duration really important ?
type Booking struct {
Start time.Time
Stop time.Time
Duration uint
Workflow string
}
type ScheduledBooking struct {
Bookings []Booking
Mu sync.Mutex
}
func (s Booking) Equals(other Booking) bool {
return s.Workflow == other.Workflow && s.Start == other.Start && s.Stop == other.Stop
}
func (sb *ScheduledBooking) AddSchedule(new_booking Booking){
if(!sb.scheduleAlreadyExists(new_booking)){
sb.Bookings = append(sb.Bookings,new_booking)
logger.Logger.Info().Msg("Updated list schedules : \n " + sb.String())
} else {
// Debug condition : delete once this feature is ready to be implemented
logger.Logger.Debug().Msg("Workflow received not added")
logger.Logger.Debug().Msg("current schedule contains")
for _, booking := range(sb.Bookings){
logger.Logger.Debug().Msg(booking.String())
}
}
}
func (sb *ScheduledBooking) GetListNames()(list_names []string ){
for _, schedule := range(sb.Bookings){
list_names = append(list_names, schedule.Workflow)
}
return
}
func (sb *ScheduledBooking) scheduleAlreadyExists(new_booking Booking) bool {
for _, booking := range(sb.Bookings){
if booking.Equals(new_booking){
return true
}
}
return false
}
func (b *Booking) String() string {
return fmt.Sprintf("{workflow : %s , start_date : %s , stop_date : %s }", b.Workflow, b.Start.Format(time.RFC3339), b.Stop.Format(time.RFC3339))
}
func (sb *ScheduledBooking) String() string {
var str string
for _, booking := range(sb.Bookings){
str += fmt.Sprintf("%s\n", booking.String())
}
return str
}

View File

@@ -1,40 +0,0 @@
package models
type Parameter struct {
Name string `yaml:"name,omitempty"`
Value string `yaml:"value,omitempty"`
}
type Container struct {
Image string `yaml:"image"`
Command []string `yaml:"command,omitempty,flow"`
Args []string `yaml:"args,omitempty,flow"`
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
}
type VolumeMount struct {
Name string `yaml:"name"`
MountPath string `yaml:"mountPath"`
}
type Task struct {
Name string `yaml:"name"`
Template string `yaml:"template"`
Dependencies []string `yaml:"dependencies,omitempty"`
Arguments struct {
Parameters []Parameter `yaml:"parameters,omitempty"`
} `yaml:"arguments,omitempty"`
}
type Dag struct {
Tasks []Task `yaml:"tasks,omitempty"`
}
type Template struct {
Name string `yaml:"name"`
Inputs struct {
Parameters []Parameter `yaml:"parameters"`
} `yaml:"inputs,omitempty"`
Container Container `yaml:"container,omitempty"`
Dag Dag `yaml:"dag,omitempty"`
}

View File

@@ -1,19 +0,0 @@
package models
type VolumeClaimTemplate struct {
Metadata struct {
Name string `yaml:"name"`
} `yaml:"metadata"`
Spec VolumeSpec `yaml:"spec"`
}
type VolumeSpec struct {
AccessModes []string `yaml:"accessModes,flow"`
Resources struct {
Requests struct {
Storage string `yaml:"storage"`
} `yaml:"requests"`
} `yaml:"resources"`
}

Binary file not shown.

12
schedulerd.json Normal file
View File

@@ -0,0 +1,12 @@
{
"LOKI_URL" : "http://172.16.0.181:3100",
"MONGO_URL":"mongodb://172.16.0.181:27017/",
"NATS_URL":"nats://172.16.0.181:4222",
"MONGO_DATABASE":"DC_myDC",
"MONITORD_PATH": "../oc-monitord/oc-monitord",
"KUBERNETES_SERVICE_HOST" : "172.16.0.181",
"MODE": "container",
"KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUL1NDWEMycjFTWGdza0FvTGJKSEtIem4zQXYva2t0ZElpSk42WlBsWVEKY3p0dXV5K3JBMHJ5VUlkZnIyK3VCRS9VN0NjSlhPL004QVdyODFwVklzVmdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVFHOVBQQ0g0c1lMbFkvQk5CdnN5CklEam1PK0l3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUtJeFc4NERQTW1URXVVN0Z3ek44SFB6ZHdldWh6U20KVzNYMU9tczFSQVNRQWlFQXI4UTJZSGtNQndSOThhcWtTa2JqU1dhejg0OEY2VkZLWjFacXpNbDFZaTg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJZWFxQUp2bHhmYzh3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOelF6TmpjME56ZzVNQjRYRFRJMU1EUXdNekV3TURZeU9Wb1hEVEkyTURRdwpNekV3TURZeU9Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJJelpGSlJUVHJmYXlNNFoKTjlRclN4MC9wbDdoZGdvWFM5bGEydmFFRkhlYVFaalRML2NZd1dMUnhoOWVOa01SRDZjTk4reWZkSXE2aWo1SQo5RTlENGdLalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFFzUkZXUlNweDV0RGZnZDh1UTdweUw0ZERMVEFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQStXZTlBVXJRUm5pWjVCUERELzJwWjA3TzFQWWFIc01ycTZZcVB4VlV5cGdDSUhrRE8rcVlMYUhkUEhXZgpWUGszNXJmejM0Qk4xN2VyaEVxRjF0U0c1MWFqCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUNDF1NXIzM0JyenZ3ZXZaWHM2TEg3T1k4NGhOOGRrODdnTlhaUndBdWkKdXJBaU45TFdYcmYxeFoyaXp5d0FiVGk1ZVc2Q1hIMjhDdEVSWUlrcjNoTXdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBMRVJWa1VxY2ViUTM0SGZMa082CmNpK0hReTB3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUpLWGZLdXBzdklONEtQVW50c1lPNXhiaGhSQmhSYlIKN3JyeWs2VHpZMU5JQWlBVktKWis3UUxzeGFyQktORnI3eTVYYlNGanI3Y1gyQmhOYy9wdnFLcWtFUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVJd01wVjdzMHc2S0VTQ2FBWDhvSVZPUHloa2U0Q3duNWZQZnhOaUYyM3JvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFak5rVWxGTk90OXJJemhrMzFDdExIVCttWHVGMkNoZEwyVnJhOW9RVWQ1cEJtTk12OXhqQgpZdEhHSDE0MlF4RVBwdzAzN0o5MGlycUtQa2owVDBQaUFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
}