Configuration NATS

This commit is contained in:
mr
2026-01-27 09:35:47 +01:00
parent 802786daa7
commit 643beacd4b
3 changed files with 63 additions and 34 deletions

View File

@@ -9,14 +9,16 @@ import "sync"
// ===================================================
type Config struct {
NATSUrl string
MongoUrl string
MongoDatabase string
Host string
Port string
LokiUrl string
LogLevel string
Whitelist bool
NATSUrl string
MongoUrl string
MongoDatabase string
Host string
Port string
LokiUrl string
LogLevel string
Whitelist bool
PrivateKeyPath string
PublicKeyPath string
}
func (c Config) GetUrl() string {

View File

@@ -61,7 +61,7 @@ func (s State) String() string {
type API struct{}
func (a *API) Discovered(infos []*beego.ControllerInfo) {
respondToDiscovery := func(m map[string]interface{}) {
respondToDiscovery := func(m map[string]string) {
if len(m) == 0 {
a.SubscribeRouter(infos)
}
@@ -91,8 +91,10 @@ func (a *API) GetState() (State, int, error) {
return ALIVE, 200, nil // If everything is up, return alive
}
func (a *API) ListenRouter(exec func(msg map[string]interface{})) {
go NewNATSCaller().ListenNats(DISCOVERY.GenerateKey("api"), exec)
func (a *API) ListenRouter(exec func(msg map[string]string)) {
go NewNATSCaller().ListenNats(DISCOVERY.GenerateKey(), map[NATSMethod]func(msg map[string]string){
DISCOVERY: exec,
})
}
func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {

View File

@@ -2,27 +2,39 @@ package tools
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/logs"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
)
// NATS Method Enum defines the different methods that can be used to interact with the NATS server
type NATSMethod int
var meths = []string{"remove execution", "create execution", "discovery",
"workflow event", "peer discovery", "peer detection"}
const (
REMOVE NATSMethod = iota
CREATE
REMOVE_EXECUTION NATSMethod = iota
CREATE_EXECTUTION
DISCOVERY
WORKFLOW_EVENT
PEER_DISCOVERY
PEER_DETECTION
)
func (n NATSMethod) String() string {
return meths[n]
}
// NameToMethod returns the NATSMethod enum value from a string
func NameToMethod(name string) NATSMethod {
for _, v := range [...]NATSMethod{REMOVE, CREATE} {
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECTUTION, DISCOVERY, WORKFLOW_EVENT, PEER_DISCOVERY, PEER_DETECTION} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}
@@ -31,13 +43,8 @@ func NameToMethod(name string) NATSMethod {
}
// GenerateKey generates a key for the NATSMethod usefull for standard key based on data name & method
func (d NATSMethod) GenerateKey(name string) string {
return name + "_" + d.String()
}
// String returns the string of the enum
func (d NATSMethod) String() string {
return [...]string{"remove", "create", "discovery"}[d]
func (d NATSMethod) GenerateKey() string {
return strings.ReplaceAll(d.String(), " ", "_")
}
type natsCaller struct{}
@@ -49,7 +56,7 @@ func NewNATSCaller() *natsCaller {
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (s *natsCaller) ListenNats(chanName string, exec func(msg map[string]interface{})) {
func (s *natsCaller) ListenNats(chanName string, execs map[NATSMethod]func(map[string]string)) {
log := logs.GetLogger()
if config.GetConfig().NATSUrl == "" {
log.Error().Msg(" -> NATS_SERVER is not set")
@@ -58,21 +65,17 @@ func (s *natsCaller) ListenNats(chanName string, exec func(msg map[string]interf
for {
nc, err := nats.Connect(config.GetConfig().NATSUrl)
if err != nil {
log.Error().Msg("Could not connect to NATS")
time.Sleep(1 * time.Minute)
continue
}
ch := make(chan *nats.Msg, 64)
subs, err := nc.ChanSubscribe(chanName, ch)
if err != nil {
log.Error().Msg("Error listening to NATS : " + err.Error())
}
defer subs.Unsubscribe()
for msg := range ch {
map_mess := map[string]interface{}{}
json.Unmarshal(msg.Data, &map_mess)
exec(map_mess)
defer nc.Close()
var wg sync.WaitGroup
wg.Add(len(execs))
for k, v := range execs {
go s.listenForChange(log, nc, k, v, &wg)
}
wg.Wait()
break
}
}
@@ -93,7 +96,7 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa
if err != nil {
return " -> " + err.Error()
}
err = nc.Publish(method.GenerateKey(dataName), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method
err = nc.Publish(method.GenerateKey(), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method
if err != nil {
time.Sleep(1 * time.Minute)
continue
@@ -102,3 +105,25 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa
}
return ""
}
// Goroutine listening to a NATS server for updates
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsTools NATSMethod,
function func(map[string]string), wg *sync.WaitGroup) {
defer wg.Done()
ch := make(chan *nats.Msg, 64)
logger.Info().Msg("Listening to " + natsTools.GenerateKey())
subs, err := nc.ChanSubscribe(natsTools.GenerateKey(), ch)
if err != nil {
logger.Error().Msg("Error listening to NATS : " + err.Error())
}
defer subs.Unsubscribe()
for msg := range ch {
map_mess := map[string]string{}
json.Unmarshal(msg.Data, &map_mess)
fmt.Println("Catching " + natsTools.String() + " workflow... " + map_mess["id"])
function(map_mess)
}
}