diff --git a/go.mod b/go.mod index 7a41244..d0080bc 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-monitord go 1.22.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20240924075418-021b461b0a7d + cloud.o-forge.io/core/oc-lib v0.0.0-20241010074019-69fe3f8d76f4 github.com/akamensky/argparse v1.4.0 github.com/goraz/onion v0.1.3 github.com/nats-io/nats-server/v2 v2.10.18 @@ -14,6 +14,8 @@ require ( ) require ( + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect + github.com/beego/beego/v2 v2.3.1 // indirect github.com/gabriel-vasile/mimetype v1.4.5 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect @@ -38,10 +40,11 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect - go.mongodb.org/mongo-driver v1.17.0 // indirect - golang.org/x/crypto v0.27.0 // indirect - golang.org/x/net v0.29.0 // indirect + go.mongodb.org/mongo-driver v1.17.1 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index c0fa3cd..23af29e 100644 --- a/go.sum +++ b/go.sum @@ -46,10 +46,40 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20240830131445-af18dba5563c h1:4ZoM9ONJiaeLH cloud.o-forge.io/core/oc-lib v0.0.0-20240830131445-af18dba5563c/go.mod h1:FIJD0taWLJ5pjQLJ6sfE2KlTkvbmk5SMcyrxdjsaVz0= cloud.o-forge.io/core/oc-lib v0.0.0-20240924075418-021b461b0a7d h1:f8cT/NunF+eoZLU5B9gmiT4ky99zPmnQBbj4tj23KuA= cloud.o-forge.io/core/oc-lib v0.0.0-20240924075418-021b461b0a7d/go.mod h1:FIJD0taWLJ5pjQLJ6sfE2KlTkvbmk5SMcyrxdjsaVz0= +cloud.o-forge.io/core/oc-lib v0.0.0-20240927112324-cdf513c2c454 h1:F5/oBMypnb6Mdvcf6N8y8v/DgfglPQ6VsQUY7hjC2zA= +cloud.o-forge.io/core/oc-lib v0.0.0-20240927112324-cdf513c2c454/go.mod h1:FIJD0taWLJ5pjQLJ6sfE2KlTkvbmk5SMcyrxdjsaVz0= +cloud.o-forge.io/core/oc-lib v0.0.0-20241001074325-aaef5334ce54 h1:0Wt94PIda2IOnZec9yGBfpSBgsT+kYefm904r0jPmkg= +cloud.o-forge.io/core/oc-lib v0.0.0-20241001074325-aaef5334ce54/go.mod h1:FIJD0taWLJ5pjQLJ6sfE2KlTkvbmk5SMcyrxdjsaVz0= +cloud.o-forge.io/core/oc-lib v0.0.0-20241001081722-4ec32bafa7b6 h1:skTKCsFU5UlSSrc+AWgc4wsXMoZktZK/23eAjVFBEvo= +cloud.o-forge.io/core/oc-lib v0.0.0-20241001081722-4ec32bafa7b6/go.mod h1:FIJD0taWLJ5pjQLJ6sfE2KlTkvbmk5SMcyrxdjsaVz0= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002102322-c309d9762350 h1:ybK3Qz1inr9xgrJwbHjSOTNaFIyX+AVINyzqcsvpATY= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002102322-c309d9762350/go.mod h1:FIJD0taWLJ5pjQLJ6sfE2KlTkvbmk5SMcyrxdjsaVz0= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002120813-a09a04e1a71e h1:77QHk5JSf0q13B/Ai3xjcsGSS7nX+9AfxcsYz5oDo/A= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002120813-a09a04e1a71e/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002141646-2797d97537f8 h1:3PFzsoP3GFFAT1sBb8ljselfAEssFpds2HVPrUnIGec= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002141646-2797d97537f8/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002144524-de0b910e0953 h1:vu+6FyhLFbYDlC75IHhN+5wQl1oI8GpuEsS3g5LkWqw= +cloud.o-forge.io/core/oc-lib v0.0.0-20241002144524-de0b910e0953/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241003074627-1a061f2d1fa4 h1:ZEFtSzUhtHm1jQ2KdS9WcX3R38zFht+LbzpAX2SgD5Q= +cloud.o-forge.io/core/oc-lib v0.0.0-20241003074627-1a061f2d1fa4/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241003152554-3388fcc6f354 h1:/2HhRinnZTnuS/vT8TpCvkQYRJM+3aCoTxwe3JjoZG0= +cloud.o-forge.io/core/oc-lib v0.0.0-20241003152554-3388fcc6f354/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241004084230-c083ce748cb2 h1:SMMnV8jKaJ4RPi5E4EHX8FX4+bfvu0KvBRgiB8OBuEw= +cloud.o-forge.io/core/oc-lib v0.0.0-20241004084230-c083ce748cb2/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241009111931-84024a143e67 h1:0CMdmukFqTrGv8smRCBYG2pVAFdZj4AEcyBhltyyqYM= +cloud.o-forge.io/core/oc-lib v0.0.0-20241009111931-84024a143e67/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241010065522-17749c6c0bd1 h1:tgzkJK/lJ7JBwNTvstiltGdgJwfbBrCPiXnQKkUYW1U= +cloud.o-forge.io/core/oc-lib v0.0.0-20241010065522-17749c6c0bd1/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= +cloud.o-forge.io/core/oc-lib v0.0.0-20241010074019-69fe3f8d76f4 h1:8TC9Ahg2ZlqhfoYulCB/z9CzNc5zbkP2jQ0ul4AUUzo= +cloud.o-forge.io/core/oc-lib v0.0.0-20241010074019-69fe3f8d76f4/go.mod h1:t+zpCTVKVdHH/BImwtMYY2QIWLMXKgY4n/JhFm3Vpu8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/beego/beego/v2 v2.3.1 h1:7MUKMpJYzOXtCUsTEoXOxsDV/UcHw6CPbaWMlthVNsc= +github.com/beego/beego/v2 v2.3.1/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.17+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -177,6 +207,8 @@ go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4 go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw= go.mongodb.org/mongo-driver v1.17.0 h1:Hp4q2MCjvY19ViwimTs00wHi7G4yzxh4/2+nTx8r40k= go.mongodb.org/mongo-driver v1.17.0/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -187,6 +219,8 @@ golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -199,6 +233,8 @@ golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= @@ -224,6 +260,8 @@ golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -236,11 +274,17 @@ golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= diff --git a/main.go b/main.go index d713365..aa589ac 100644 --- a/main.go +++ b/main.go @@ -95,7 +95,7 @@ func main() { logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") } - argo_file_path, err := new_wf.ExportToArgo(conf.GetConfig().Timeout) + argo_file_path, stepMax, err := new_wf.ExportToArgo(conf.GetConfig().Timeout) if err != nil { logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg(err.Error()) @@ -107,20 +107,17 @@ func main() { wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() wf_logger.Debug().Msg("Testing argo name") - executeWorkflow(argo_file_path) + executeWorkflow(argo_file_path, stepMax) } // Return the Workflow ID associated to a workflow execution object func getWorkflowId(exec_id string) string { - res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec_id) - if res.Code != 200 { logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) return "" } - wf_exec := res.ToWorkflowExecution() return wf_exec.WorkflowID @@ -128,29 +125,22 @@ func getWorkflowId(exec_id string) string { // So far we only log the output from -func executeWorkflow(argo_file_path string) { +func executeWorkflow(argo_file_path string, stepMax int) { // var stdout, stderr, stdout_logs, stderr_logs io.ReadCloser var stdout, stderr io.ReadCloser // var stderr io.ReadCloser var err error - cmd := exec.Command("argo", "submit", "--watch", "./argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo") + cmd := exec.Command("argo", "submit", "--log", "./argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo") fmt.Println(cmd) if stdout, err = cmd.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return } - - if stderr, err = cmd.StderrPipe(); err != nil { - wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error()) - return - } - if err := cmd.Start(); err != nil { panic(err) } - var wg sync.WaitGroup - go logWorkflow(stdout, &wg) + go logWorkflow(argo_file_path, stepMax, stdout, &wg) if err := cmd.Wait(); err != nil { wf_logger.Error().Msg("Could not execute argo submit") @@ -163,46 +153,43 @@ func executeWorkflow(argo_file_path string) { // We could improve this function by creating an object with the same attribute as the output // and only send a new log if the current object has different values than the previous -func logWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) { - var current_watch, previous_watch models.ArgoWatch +func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup) { + var current_watch, previous_watch *models.ArgoWatch + split := strings.Split(argo_file_path, "_") + argoLogs := models.NewArgoLogs(split[0], "argo", stepMax) watch_output := make([]string, 0) scanner := bufio.NewScanner(pipe) + for scanner.Scan() { log := scanner.Text() watch_output = append(watch_output, log) - - if strings.HasPrefix(log, "Progress:") { - - current_watch = *models.NewArgoLogs(watch_output) - workflowName = current_watch.Name - if !current_watch.Equals(previous_watch) { - wg.Add(1) - checkStatus(current_watch.Status, previous_watch.Status) - jsonified, err := json.Marshal(current_watch) - if err != nil { - logger.Error().Msg("Could not create watch log") - } - wf_logger.Info().Msg(string(jsonified)) - previous_watch = current_watch - current_watch = models.ArgoWatch{} - wg.Done() + if strings.Contains(log, "Progress:") { + current_watch = argoLogs.StopStepRecording(watch_output) + watch_output = []string{} + } else if strings.Contains(log, "sub-process exited") { + current_watch = argoLogs.StopStepRecording(watch_output) + } + if current_watch != nil && !current_watch.Equals(previous_watch) && current_watch.Name != "" { + wg.Add(1) + checkStatus(current_watch, previous_watch) + jsonified, err := json.Marshal(current_watch) + if err != nil { + logger.Error().Msg("Could not create watch log") } + if current_watch.Status == "Failed" { + wf_logger.Error().Msg(string(jsonified)) + } else { + wf_logger.Info().Msg(string(jsonified)) + } + previous_watch = current_watch + current_watch = &models.ArgoWatch{} + watch_output = []string{} + wg.Done() } } } -// Debug, no logs sent -func logPods(pipe io.ReadCloser, name string) { - pods_logger = wf_logger.With().Str("pod_name", name).Logger() - scanner := bufio.NewScanner(pipe) - for scanner.Scan() { - log := scanner.Text() - pods_logger.Info().Msg(log) - } - -} - func loadConfig(is_k8s bool, parser *argparse.Parser) { var o *onion.Onion @@ -298,9 +285,9 @@ func getContainerName(argo_file string) string { } // Uses the ArgoWatch object to update status of the workflow execution object -func checkStatus(current string, previous string) { - if current != previous { - updateStatus(current) +func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch) { + if previous != nil && current.Status != previous.Status { + updateStatus(current.Status) } } @@ -317,6 +304,4 @@ func updateStatus(status string) { if res.Code != 200 { logger.Error().Msg("Could not update status for workflow execution " + exec_id) } - - fmt.Printf("status argo : %s /nstatus db : %s", status, serialized["state"]) } diff --git a/models/argo_logs.go b/models/argo_logs.go index f79c5c9..534adac 100644 --- a/models/argo_logs.go +++ b/models/argo_logs.go @@ -1,7 +1,12 @@ package models import ( + "fmt" + "strconv" "strings" + "time" + + "github.com/acarl005/stripansi" ) type ArgoWatch struct { @@ -13,6 +18,7 @@ type ArgoWatch struct { Started string Duration string Progress string + Logs []string } type Conditions struct { @@ -20,53 +26,103 @@ type Conditions struct { Completed bool } -func (a *ArgoWatch) Equals(arg ArgoWatch) bool { +func (a *ArgoWatch) Equals(arg *ArgoWatch) bool { + if arg == nil { + return false + } return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed } -// Take the slice of string that make up one round of stderr outputs from the --watch option in argo submit -func NewArgoLogs(inputs []string) *ArgoWatch { - var workflow ArgoWatch +func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { + return &ArgoLogs{ + Name: "oc-monitor-" + name, + Namespace: namespace, + CreatedDate: time.Now().Format("2006-01-02 15:04:05"), + StepCount: 0, + StepMax: stepMax, + stop: false, + } +} +type ArgoLogs struct { + Name string + Namespace string + CreatedDate string + StepCount int + StepMax int + stop bool + Started time.Time +} + +func (a *ArgoLogs) StartStepRecording() { + a.StepCount += 1 + a.Started = time.Now() +} + +func (a *ArgoLogs) StopStepRecording(inputs []string) *ArgoWatch { + fn := strings.Split(a.Name, "_") + logs := []string{} + err := false + end := "" for _, input := range inputs { line := strings.TrimSpace(input) - if line == "" { + if line == "" || !strings.Contains(line, fn[0]) || !strings.Contains(line, ":") { continue } - switch { - case strings.HasPrefix(line, "Name:"): - workflow.Name = parseValue(line) - case strings.HasPrefix(line, "Namespace:"): - workflow.Namespace = parseValue(line) - case strings.HasPrefix(line, "Status:"): - workflow.Status = parseValue(line) - case strings.HasPrefix(line, "PodRunning"): - workflow.PodRunning = parseBoolValue(line) - case strings.HasPrefix(line, "Completed"): - workflow.Completed = parseBoolValue(line) - case strings.HasPrefix(line, "Created:"): - workflow.Created = parseValue(line) - case strings.HasPrefix(line, "Started:"): - workflow.Started = parseValue(line) - case strings.HasPrefix(line, "Duration:"): - workflow.Duration = parseValue(line) - case strings.HasPrefix(line, "Progress:"): - workflow.Progress = parseValue(line) + step := strings.Split(line, ":") + if strings.Contains(line, "sub-process exited") { + b := strings.Split(line, "time=\"") + if len(b) > 1 { + end = b[1][:19] + } + } + if len(step) < 2 || strings.Contains(line, "time=") || strings.TrimSpace(strings.Join(step[1:], " : ")) == "" || strings.TrimSpace(strings.Join(step[1:], " : ")) == a.Name { + continue + } + log := stripansi.Strip(strings.TrimSpace(strings.Join(step[1:], " : "))) + t, e := strconv.Unquote(log) + if e == nil { + logs = append(logs, t) + } else { + logs = append(logs, strings.ReplaceAll(log, "\"", "`")) + } + + if strings.Contains(logs[len(logs)-1], "Error") { + err = true } } - - return &workflow -} - -func parseValue(line string) string { - parts := strings.SplitN(line, ":", 2) - if len(parts) < 2 { - return "" + status := "Pending" + if a.StepCount > 0 { + status = "Running" } - return strings.TrimSpace(parts[1]) -} - -func parseBoolValue(line string) bool { - value := parseValue(line) - return value == "True" + if a.StepCount == a.StepMax { + if err { + status = "Failed" + } else { + status = "Succeeded" + } + } + duration := float64(0) + if end != "" { + timeE, _ := time.Parse("2006-01-02T15:04:05", end) + duration = timeE.Sub(a.Started).Seconds() + } + argo := &ArgoWatch{ + Name: a.Name, + Namespace: a.Namespace, + Status: status, + Created: a.CreatedDate, + Started: a.Started.Format("2006-01-02 15:04:05"), + Conditions: Conditions{ + PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, + Completed: a.StepCount == a.StepMax, + }, + Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), + Duration: fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s"), + Logs: logs, + } + if !argo.Completed { + a.StartStepRecording() + } + return argo } diff --git a/models/services.go b/models/services.go index ba0e10a..80465d1 100644 --- a/models/services.go +++ b/models/services.go @@ -1,40 +1,36 @@ package models - type ServiceResource struct { Action string `yaml:"action,omitempty"` - SuccessCondition string `yaml:"successCondition,omitempty"` + SuccessCondition string `yaml:"successCondition,omitempty"` FailureCondition string `yaml:"failureCondition,omitempty"` SetOwnerReference bool `yaml:"setOwnerReference,omitempty"` Manifest string `yaml:"manifest,omitempty"` } type Service struct { - APIVersion string `yaml:"apiVersion"` - Kind string `yaml:"kind"` - Metadata Metadata `yaml:"metadata"` - Spec ServiceSpec `yaml:"spec"` + APIVersion string `yaml:"apiVersion"` + Kind string `yaml:"kind"` + Metadata Metadata `yaml:"metadata"` + Spec ServiceSpec `yaml:"spec"` } type Metadata struct { - Name string `yaml:"name"` - + Name string `yaml:"name"` } // ServiceSpec is the specification of the Kubernetes Service type ServiceSpec struct { - Selector map[string]string `yaml:"selector,omitempty"` - Ports []ServicePort `yaml:"ports"` - ClusterIP string `yaml:"clusterIP,omitempty"` - Type string `yaml:"type,omitempty"` + Selector map[string]string `yaml:"selector,omitempty"` + Ports []ServicePort `yaml:"ports"` + ClusterIP string `yaml:"clusterIP,omitempty"` + Type string `yaml:"type,omitempty"` } // ServicePort defines a port for a Kubernetes Service type ServicePort struct { - Name string `yaml:"name"` // Even if empty need to be in the yaml - - Protocol string `yaml:"protocol,omitempty"` - Port int64 `yaml:"port"` - TargetPort int64 `yaml:"targetPort,omitempty"` - NodePort int64 `yaml:"nodePort,omitempty"` -} \ No newline at end of file + Name string `yaml:"name"` // Even if empty need to be in the yaml + Protocol string `yaml:"protocol,omitempty"` + Port int `yaml:"port"` + TargetPort int `yaml:"targetPort,omitempty"` +} diff --git a/models/template.go b/models/template.go index c7c9b0e..c2e22ab 100644 --- a/models/template.go +++ b/models/template.go @@ -1,5 +1,12 @@ package models +import ( + "strings" + + "cloud.o-forge.io/core/oc-lib/models/resources/processing" + "cloud.o-forge.io/core/oc-lib/models/resources/storage" +) + type Parameter struct { Name string `yaml:"name,omitempty"` Value string `yaml:"value,omitempty"` @@ -12,9 +19,28 @@ type Container struct { VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"` } +func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount { + for _, vm := range c.VolumeMounts { + if vm.Name == volumeMount.Name { + return volumes + } + } + c.VolumeMounts = append(c.VolumeMounts, volumeMount) + for _, vm := range c.VolumeMounts { + for _, v := range volumes { + if vm.Name == v.Name { + return volumes + } + } + } + volumes = append(volumes, volumeMount) + return volumes +} + type VolumeMount struct { - Name string `yaml:"name"` - MountPath string `yaml:"mountPath"` + Name string `yaml:"name"` + MountPath string `yaml:"mountPath"` + Storage *storage.StorageResource `yaml:"-"` } type Task struct { @@ -39,8 +65,47 @@ type Template struct { Inputs struct { Parameters []Parameter `yaml:"parameters"` } `yaml:"inputs,omitempty"` - Container Container `yaml:"container,omitempty"` - Dag Dag `yaml:"dag,omitempty"` + Container Container `yaml:"container,omitempty"` + Dag *Dag `yaml:"dag,omitempty"` Metadata TemplateMetadata `yaml:"metadata,omitempty"` - Resource ServiceResource `yaml:"resource,omitempty"` + Resource ServiceResource `yaml:"resource,omitempty"` +} + +func (template *Template) CreateContainer(processing *processing.ProcessingResource, dag *Dag) { + container := Container{Image: processing.Container.Image} + if container.Image == "" { + return + } + container.Command = []string{"sh", "-c"} // all is bash + for name := range processing.Container.Env { + template.Inputs.Parameters = append(template.Inputs.Parameters, Parameter{Name: name}) + } + for _, a := range strings.Split(processing.Container.Args, " ") { + container.Args = append(container.Args, template.replacePerEnv(a, processing.Container.Env, dag)) + } + cmd := strings.ReplaceAll(processing.Container.Command, container.Image, "") + container.Args = []string{cmd + " " + strings.Join(container.Args, " ")} + template.Container = container +} + +func (template *Template) replacePerEnv(arg string, envs map[string]string, dag *Dag) string { + for k, v := range envs { + if strings.Contains(arg, k) { + value := v + for _, task := range dag.Tasks { + if task.Name == template.Name { + for _, p := range task.Arguments.Parameters { + if p.Name == k { + value = p.Value + break + } + } + } + } + arg = strings.ReplaceAll(arg, "$"+k, value) + arg = strings.ReplaceAll(arg, "${"+k+"}", value) + arg = strings.ReplaceAll(arg, k, value) + } + } + return arg } diff --git a/oc-monitord b/oc-monitord index b0033b6..680cae8 100755 Binary files a/oc-monitord and b/oc-monitord differ diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 3e560ea..f47da61 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -8,12 +8,12 @@ import ( "fmt" . "oc-monitord/models" "os" - "slices" + "regexp" "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/models/resource_model" + "cloud.o-forge.io/core/oc-lib/models/resources/processing" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" w "cloud.o-forge.io/core/oc-lib/models/workflow" "github.com/nwtgck/go-fakelish" @@ -24,9 +24,9 @@ import ( var logger zerolog.Logger type ArgoBuilder struct { - OriginWorkflow w.Workflow + OriginWorkflow *w.Workflow Workflow Workflow - Services *Service + Services []*Service Timeout int } @@ -39,6 +39,24 @@ type Workflow struct { Spec Spec `yaml:"spec,omitempty"` } +func (b *Workflow) setDag(dag *Dag) { + for _, t := range b.Spec.Templates { + if t.Name == "dag" { + t.Dag = dag + } + } +} + +func (b *Workflow) getDag() *Dag { + for _, t := range b.Spec.Templates { + if t.Name == "dag" { + return t.Dag + } + } + b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}}) + return b.Spec.Templates[len(b.Spec.Templates)-1].Dag +} + type Spec struct { Entrypoint string `yaml:"entrypoint"` Arguments []Parameter `yaml:"arguments,omitempty"` @@ -47,15 +65,11 @@ type Spec struct { Timeout int `yaml:"activeDeadlineSeconds,omitempty"` } -func (b *ArgoBuilder) CreateDAG() (string, error) { +func (b *ArgoBuilder) CreateDAG(write bool) (string, int, []string, []string, error) { // handle services by checking if there is only one processing with hostname and port - - b.createNginxVolumes() - - b.createTemplates() - b.createDAGstep() - b.createVolumes() + firstItems, lastItems, volumes := b.createTemplates() + b.createVolumes(volumes) if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout @@ -63,15 +77,17 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { b.Workflow.Spec.Entrypoint = "dag" b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" - random_name := generateWfName() + if !write { + return "", len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil + } + random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) b.Workflow.Metadata.Name = "oc-monitor-" + random_name logger = oclib.GetLogger() yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") - return "", err + return "", 0, firstItems, lastItems, err } - // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" @@ -79,189 +95,205 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) if err != nil { logger.Error().Msg("Could not write the yaml file") - return "", err + return "", 0, firstItems, lastItems, err } - - return file_name, nil + return file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } -func (b *ArgoBuilder) createTemplates() { - for _, comp := range b.getProcessings() { - var command string - var args string - var env string - - comp_res := comp.Processing - - command = getStringValue(comp_res.AbstractResource, "command") - args = getStringValue(comp_res.AbstractResource, "args") - env = getStringValue(comp_res.AbstractResource, "env") - - image_name := strings.Split(command, " ")[0] // TODO : decide where to store the image name, GUI or models.computing.Image - temp_container := Container{Image: image_name} // TODO : decide where to store the image name, GUI or models.computing.Image - temp_container.Command = getComputingCommands(command) - temp_container.Args = getComputingArgs(args, command) - // Only for dev purpose, - input_names := getComputingEnvironmentName(strings.Split(env, " ")) - - var inputs_container []Parameter - for _, name := range input_names { - inputs_container = append(inputs_container, Parameter{Name: name}) +func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) { + volumes := []VolumeMount{} + firstItems := []string{} + lastItems := []string{} + for _, comp := range b.OriginWorkflow.GetProcessings() { + if comp.Processing.Container != nil { + volumes, firstItems, lastItems = b.createArgoTemplates( + comp.ID, comp.Processing, volumes, firstItems, lastItems) + } else { + logger.Error().Msg("Not enough configuration setup, template can't be created : " + comp.Processing.GetName()) + return firstItems, lastItems, volumes } - - argo_name := getArgoName(comp_res.GetName(), comp.ID) - new_temp := Template{Name: argo_name, Container: temp_container} - new_temp.Inputs.Parameters = inputs_container - new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name - new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx - - if b.isService(comp.ID) { - serv := b.CreateService(comp) - b.createService(serv, argo_name, comp.ID) - new_temp.Metadata.Labels = make(map[string]string) - new_temp.Metadata.Labels["app"] = "oc-service" // Construct the template for the k8s service and add a link in graph between k8s service and processing - // if err != nil { - // // TODO - // } - } - - b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) } - - if b.Services != nil { - b.addServiceToArgo() - } - -} - -func (b *ArgoBuilder) createDAGstep() { - new_dag := Dag{} - for _, comp := range b.getProcessings() { - comp_res := comp.Processing - env := getStringValue(comp_res.AbstractResource, "env") - unique_name := getArgoName(comp_res.GetName(), comp.ID) - step := Task{Name: unique_name, Template: unique_name} - comp_envs := getComputingEnvironment(strings.Split(env, " ")) - - for name, value := range comp_envs { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{Name: name, Value: value}) - } - - // retrieves the name (computing.name-computing.ID) - step.Dependencies = b.getDependency(comp.ID) // Error : we use the component ID instead of the GraphItem ID -> store objects - new_dag.Tasks = append(new_dag.Tasks, step) - } - - if b.Services != nil { - new_dag.Tasks = append(new_dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) - } - - b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) - -} - -func (b *ArgoBuilder) createVolumes() { - // For testing purposes we only declare one volume, mounted in each computing - new_volume := VolumeClaimTemplate{} - new_volume.Metadata.Name = "workdir" - new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} - new_volume.Spec.Resources.Requests.Storage = "1Gi" - b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) -} - -// For demo purposes, until we implement the use of storage ressources -func (b *ArgoBuilder) createNginxVolumes() { - new_volume := VolumeClaimTemplate{} - new_volume.Metadata.Name = "nginx-demo" - new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} - new_volume.Spec.Resources.Requests.Storage = "1Gi" - - b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) -} - -func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { - for _, link := range b.OriginWorkflow.Graph.Links { - if b.OriginWorkflow.Graph.Items[link.Source.ID].Processing == nil { + firstWfTasks := map[string][]string{} + latestWfTasks := map[string][]string{} + relatedWfTasks := map[string][]string{} + for _, wf := range b.OriginWorkflow.GetWorkflows() { + realWorkflow, code, err := w.New().LoadOne(wf.Workflow.WorkflowID) + if code != 200 { + logger.Error().Msg("Error loading the workflow : " + err.Error()) continue } + subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} + _, _, fi, li, err := subBuilder.CreateDAG(false) + if err != nil { + logger.Error().Msg("Error creating the subworkflow : " + err.Error()) + continue + } + firstWfTasks[wf.ID] = fi + if ok, depsOfIds := subBuilder.isArgoDependancy(wf.ID); ok { // IS BEFORE + latestWfTasks[wf.ID] = li + relatedWfTasks[wf.ID] = depsOfIds + } + subDag := subBuilder.Workflow.getDag() + d := b.Workflow.getDag() + d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) + b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) + b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) + b.Services = append(b.Services, subBuilder.Services...) + } + for wfID, depsOfIds := range relatedWfTasks { + for _, dep := range depsOfIds { + for _, task := range b.Workflow.getDag().Tasks { + if strings.Contains(task.Name, dep) { + index := -1 + for i, depp := range task.Dependencies { + if strings.Contains(depp, wfID) { + index = i + break + } + } + if index != -1 { + task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...) + } + task.Dependencies = append(task.Dependencies, latestWfTasks[wfID]...) + } + } + } + } + for wfID, fi := range firstWfTasks { + deps := b.getArgoDependencies(wfID) + if len(deps) > 0 { + for _, dep := range fi { + for _, task := range b.Workflow.getDag().Tasks { + if strings.Contains(task.Name, dep) { + task.Dependencies = append(task.Dependencies, deps...) + } + } + } + } + } + if b.Services != nil { + dag := b.Workflow.getDag() + dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) + b.addServiceToArgo() + } + return firstItems, lastItems, volumes +} + +func (b *ArgoBuilder) createArgoTemplates(id string, + processing *processing.ProcessingResource, + volumes []VolumeMount, + firstItems []string, + lastItems []string) ([]VolumeMount, []string, []string) { + _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) + template := &Template{Name: getArgoName(processing.GetName(), id)} + template.CreateContainer(processing, b.Workflow.getDag()) + if len(processing.Expose) > 0 { + b.CreateService(id, processing) + template.Metadata.Labels = make(map[string]string) + template.Metadata.Labels["app"] = "oc-service" // Construct the template for the k8s service and add a link in graph between k8s service and processing + } + storages := b.OriginWorkflow.GetStoragesByRelatedProcessing(id) + for _, storage := range storages { + if storage.Local { + volumes = template.Container.AddVolumeMount(VolumeMount{ + Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"), + MountPath: storage.Path, + Storage: storage, + }, volumes) + } + } + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) + return volumes, firstItems, lastItems +} +func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *processing.ProcessingResource, + firstItems []string, lastItems []string) (*Dag, []string, []string) { + unique_name := getArgoName(processing.GetName(), graphItemID) + step := Task{Name: unique_name, Template: unique_name} + if processing.Container != nil { + for name, value := range processing.Container.Env { + step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ + Name: name, + Value: b.affectVariableEnv(value, b.OriginWorkflow.Graph), + }) + } + } + step.Dependencies = b.getArgoDependencies(graphItemID) + name := "" + if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil { + name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName() + } + if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil { + name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName() + } + if len(step.Dependencies) == 0 && name != "" { + firstItems = append(firstItems, getArgoName(name, graphItemID)) + } + if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" { + lastItems = append(lastItems, getArgoName(name, graphItemID)) + } + dag.Tasks = append(dag.Tasks, step) + return dag, firstItems, lastItems +} + +func (b *ArgoBuilder) affectVariableEnv(envVar string, graph *graph.Graph) string { + var myExp = regexp.MustCompile(`(\{\{.*\}\})`) // regex to find all the variables in the command + matches := myExp.FindAllString(envVar, -1) // find all the variables in the command + for _, match := range matches { // for each variable in the command + splitted := strings.Split( // split the variable to get the inout and the vars only + strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(match, "{{", ""), "}}", ""), " ", ""), "_") + if len(splitted) < 3 { // if the variable is not well formatted, we skip it + logger.Error().Msgf("The variable %v is not well formatted", match) + continue + } + graphItemID := splitted[1] // graphitemid is the id of the object + vars := splitted[2] // vars is the name of the variable of the object + _, obj := graph.GetResource(graphItemID) + if obj != nil { + envVar = strings.ReplaceAll(envVar, match, fmt.Sprintf("%v", obj.Serialize()[vars])) + } + } + return envVar +} + +func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG + for _, volume := range volumes { + new_volume := VolumeClaimTemplate{} + new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-") + new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} + new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", volume.Storage.Size) + volume.Storage.SizeType.ToArgo() + b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) + } +} + +func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { + dependancyOfIDs := []string{} + isDeps := false + for _, link := range b.OriginWorkflow.Graph.Links { + source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing + if id == link.Source.ID && source != nil { + isDeps = true + dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), link.Destination.ID)) + } + wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow + if id == link.Source.ID && wourceWF != nil { + isDeps = true + dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), link.Destination.ID)) + } + } + return isDeps, dependancyOfIDs +} + +func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { + for _, link := range b.OriginWorkflow.Graph.Links { source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing - fmt.Println("source", source, current_computing_id, link.Destination.ID) - if current_computing_id == link.Destination.ID && source != nil { + if id == link.Destination.ID && source != nil { dependency_name := getArgoName(source.GetName(), link.Source.ID) dependencies = append(dependencies, dependency_name) + continue } } return - -} - -func getComputingCommands(user_input string) []string { - user_input = removeImageName(user_input) - if len(user_input) == 0 { - return []string{} - } - return strings.Split(user_input, " ") -} - -func getComputingArgs(user_input string, command string) (list_args []string) { - if len(user_input) == 0 { - return - } - - args := strings.Split(user_input, " ") - - // quickfix that might need improvement - if strings.Contains(command, "sh -c") { - list_args = append(list_args, strings.Join(args, " ")) - return - } - - list_args = append(list_args, args...) - return -} - -// Currently implements code to overcome problems in data structure -func getComputingEnvironment(user_input []string) (map_env map[string]string) { - logger := oclib.GetLogger() - is_empty := len(user_input) == 0 - is_empty_string := len(user_input) == 1 && user_input[0] == "" - - if is_empty || is_empty_string { - return - } - - if len(user_input) == 1 { - user_input = strings.Split(user_input[0], ",") - } - - map_env = make(map[string]string, 0) - - for _, str := range user_input { - new_pair := strings.Split(str, "=") - - if len(new_pair) != 2 { - logger.Error().Msg("Error extracting the environment variable from " + str) - panic(0) - } - - map_env[new_pair[0]] = new_pair[1] - } - - return -} - -func getComputingEnvironmentName(user_input []string) (list_names []string) { - env_map := getComputingEnvironment(user_input) - for name := range env_map { - list_names = append(list_names, name) - } - - return -} - -func generateWfName() (Name string) { - Name = fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) - return } func getArgoName(raw_name string, component_id string) (formatedName string) { @@ -270,60 +302,3 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { formatedName = strings.ToLower(formatedName) return } - -func removeImageName(user_input string) string { - // First command is the name of the container for now - if len(strings.Split(user_input, " ")) == 1 { - return "" - } - - slice_input := strings.Split(user_input, " ") - new_slice := slice_input[1:] - user_input = strings.Join(new_slice, " ") - - return user_input -} - -// Return the graphItem containing a Processing resource, so that we have access to the ID of the graphItem in order to identify it in the links -func (b *ArgoBuilder) getProcessings() (list_computings []graph.GraphItem) { - for _, item := range b.OriginWorkflow.Graph.Items { - if item.Processing != nil { - list_computings = append(list_computings, item) - } - } - return -} - -// Pass a GraphItem's UUID and not the ID -func (b *ArgoBuilder) IsProcessing(component_uuid string) bool { - return slices.Contains(b.OriginWorkflow.Processings, component_uuid) -} - -func getStringValue(comp resource_model.AbstractResource, key string) string { - if res := comp.GetModelValue(key); res != nil { - return res.(string) - } - return "" -} - -func (b *ArgoBuilder) isService(id string) bool { - - comp := b.OriginWorkflow.Graph.Items[id] - - if comp.Processing == nil { - return false - } - - _, is_exposed := comp.Processing.ResourceModel.Model["expose"] - return is_exposed -} - -func (b *ArgoBuilder) addLabel(name string, id string) { - argo_name := getArgoName(name, id) - for _, template := range b.Workflow.Spec.Templates { - if template.Name == argo_name { - template.Metadata.Labels["app"] = "service-workflow" - return - } - } -} diff --git a/workflow_builder/argo_services.go b/workflow_builder/argo_services.go index a990deb..3712798 100644 --- a/workflow_builder/argo_services.go +++ b/workflow_builder/argo_services.go @@ -2,147 +2,63 @@ package workflow_builder import ( "oc-monitord/models" - "strconv" "strings" - "cloud.o-forge.io/core/oc-lib/models/resource_model" - "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" + "cloud.o-forge.io/core/oc-lib/models/resources/processing" "gopkg.in/yaml.v3" ) -// TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code -func getExposeContract(expose resource_model.Model) map[string]map[string]string { - contract := make(map[string]map[string]string,0) - - mapped_info := bson.M{} - // var contract PortTranslation - _ , byt, _ := bson.MarshalValue(expose.Value) - - bson.Unmarshal(byt,&mapped_info) - - for _,v := range mapped_info { - port := v.(primitive.M)["Key"].(string) - // exposed_port := map[string]interface{}{data["Key"] : ""} - port_translation := v.(primitive.M)["Value"] - contract[port] = map[string]string{} - for _,v2 := range port_translation.(primitive.A) { - if v2.(primitive.M)["Key"] == "reverse" { - contract[port]["reverse"] = v2.(primitive.M)["Value"].(string) - } - if v2.(primitive.M)["Key"] == "PAT" { - contract[port]["PAT"] = v2.(primitive.M)["Value"].(string) - } - } +func (b *ArgoBuilder) CreateService(id string, processing *processing.ProcessingResource) { + new_service := models.Service{ + APIVersion: "v1", + Kind: "Service", + Metadata: models.Metadata{ + Name: "workflow-service", + }, + Spec: models.ServiceSpec{ + Selector: map[string]string{"app": "oc-service"}, + Ports: []models.ServicePort{}, + Type: "NodePort", + }, } - return contract + if processing == nil { + return + } + b.completeServicePorts(&new_service, id, processing) + b.Services = append(b.Services, &new_service) } - -func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ - - // model { - // Type : "dict", - // Value : { - // "80" : { - // "reverse" : "", - // "PAT" : "34000" - // }, - // "344" : { - // "reverse" : "", - // "PAT" : "34400" - // } - // } - // } - - - new_service := models.Service{APIVersion: "v1", - Kind: "Service", - Metadata: models.Metadata{ - Name: "workflow-service" , - }, - Spec: models.ServiceSpec{ - Selector: map[string]string{"app": "oc-service"}, - Ports: []models.ServicePort{ - }, - Type: "NodePort", - }, - } - - completeServicePorts(&new_service, processing) - yamlified, _ := yaml.Marshal(new_service) - x := string(yamlified) - _ = x - return new_service -} - -func completeServicePorts(service *models.Service, processing graph.GraphItem) { - - contract := getExposeContract(processing.Processing.ResourceModel.Model["expose"]) - - - for str_port,translation_dict := range contract{ - - port, err := strconv.ParseInt(str_port, 10, 64) - if err != nil { - logger.Error().Msg("Could not convert " + str_port + "to an int") - return - } - - - if _, ok := translation_dict["PAT"]; ok{ - port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64) - if err != nil { - logger.Error().Msg("Could not convert " + translation_dict["PAT"] + "to an int") - return - } - - - +func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, processing *processing.ProcessingResource) { + for _, execute := range processing.Expose { + if execute.PAT != 0 { new_port_translation := models.ServicePort{ - Name: strings.ToLower(processing.Processing.Name) + processing.ID, - Port: port_translation-30000, - TargetPort: port, - NodePort: port_translation, - Protocol: "TCP", + Name: strings.ToLower(processing.Name) + id, + Port: execute.Port, + TargetPort: execute.PAT, + Protocol: "TCP", } service.Spec.Ports = append(service.Spec.Ports, new_port_translation) - } - - } - - return -} - -func (b *ArgoBuilder) createService(service models.Service, processing_name string, processing_id string) { - if b.Services != nil{ - b.Services.Spec.Ports = append(b.Services.Spec.Ports, service.Spec.Ports...) - }else { - b.Services = &service } - - b.addLabel(processing_name,processing_id) - + } } func (b *ArgoBuilder) addServiceToArgo() error { - service_manifest, err := yaml.Marshal(b.Services) - if err != nil { - logger.Error().Msg("Could not marshal service manifest") - return err + for _, service := range b.Services { + service_manifest, err := yaml.Marshal(service) + if err != nil { + logger.Error().Msg("Could not marshal service manifest : " + err.Error()) + return err + } + service_template := models.Template{Name: "workflow-service-pod", + Resource: models.ServiceResource{ + Action: "create", + SuccessCondition: "status.succeeded > 0", + FailureCondition: "status.failed > 3", + SetOwnerReference: true, + Manifest: string(service_manifest), + }, + } + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) } - - service_template := models.Template{Name: "workflow-service-pod", - Resource: models.ServiceResource{ - Action: "create", - SuccessCondition: "status.succeeded > 0", - FailureCondition: "status.failed > 3", - SetOwnerReference: true, - Manifest: string(service_manifest), - }, - } - b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) - return nil -} \ No newline at end of file +} diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go index 07327cf..6e1fd6c 100644 --- a/workflow_builder/graph.go +++ b/workflow_builder/graph.go @@ -26,6 +26,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string) (workflow *workflow.Workflow logger := oclib.GetLogger() lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW), workflow_id) + fmt.Println(lib_data.Code, lib_data.Err) if lib_data.Code != 200 { logger.Error().Msg("Error loading the graph") return workflow, errors.New(lib_data.Err) @@ -39,20 +40,20 @@ func (w *WorflowDB) getWorkflow(workflow_id string) (workflow *workflow.Workflow return new_wf, nil } -func (w *WorflowDB) ExportToArgo(timeout int) (string, error) { +func (w *WorflowDB) ExportToArgo(timeout int) (string, int, error) { logger := oclib.GetLogger() if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { - return "", fmt.Errorf("can't export a graph that has not been loaded yet") + return "", 0, fmt.Errorf("can't export a graph that has not been loaded yet") } - argo_builder := ArgoBuilder{OriginWorkflow: *w.Workflow, Timeout: timeout} - filename, err := argo_builder.CreateDAG() + argo_builder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} + filename, stepMax, _, _, err := argo_builder.CreateDAG(true) if err != nil { logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) - return "", err + return "", 0, err } - return filename, nil + return filename, stepMax, nil } // TODO implement this function