Structured NATS

This commit is contained in:
mr
2026-01-28 15:49:21 +01:00
parent 1c9d7b63c0
commit e3fe49c239
2 changed files with 18 additions and 14 deletions

View File

@@ -61,11 +61,9 @@ func (s State) String() string {
type API struct{} type API struct{}
func (a *API) Discovered(infos []*beego.ControllerInfo) { func (a *API) Discovered(infos []*beego.ControllerInfo) {
respondToDiscovery := func(m map[string]string) { respondToDiscovery := func(m NATSResponse) {
if len(m) == 0 {
a.SubscribeRouter(infos) a.SubscribeRouter(infos)
} }
}
a.ListenRouter(respondToDiscovery) a.ListenRouter(respondToDiscovery)
a.SubscribeRouter(infos) a.SubscribeRouter(infos)
} }
@@ -91,8 +89,8 @@ func (a *API) GetState() (State, int, error) {
return ALIVE, 200, nil // If everything is up, return alive return ALIVE, 200, nil // If everything is up, return alive
} }
func (a *API) ListenRouter(exec func(msg map[string]string)) { func (a *API) ListenRouter(exec func(msg NATSResponse)) {
go NewNATSCaller().ListenNats(map[NATSMethod]func(msg map[string]string){ go NewNATSCaller().ListenNats(map[NATSMethod]func(msg NATSResponse){
DISCOVERY: exec, DISCOVERY: exec,
}) })
} }
@@ -113,7 +111,14 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
} }
} }
} }
go nats.SetNATSPub(DISCOVERY, discovery) b, _ := json.Marshal(discovery)
go nats.SetNATSPub(DISCOVERY, NATSResponse{
FromApp: beego.AppPath,
Datatype: -1,
Method: int(DISCOVERY),
Payload: b,
})
} }
// CheckRemotePeer checks the state of a remote peer // CheckRemotePeer checks the state of a remote peer

View File

@@ -2,7 +2,6 @@ package tools
import ( import (
"encoding/json" "encoding/json"
"fmt"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -67,7 +66,7 @@ func NewNATSCaller() *natsCaller {
// on workflows' scheduling. Messages must contain // on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos // workflow execution ID, to allow retrieval of execution infos
func (s *natsCaller) ListenNats(execs map[NATSMethod]func(map[string]string)) { func (s *natsCaller) ListenNats(execs map[NATSMethod]func(NATSResponse)) {
log := logs.GetLogger() log := logs.GetLogger()
if config.GetConfig().NATSUrl == "" { if config.GetConfig().NATSUrl == "" {
log.Error().Msg(" -> NATS_SERVER is not set") log.Error().Msg(" -> NATS_SERVER is not set")
@@ -121,7 +120,7 @@ func (o *natsCaller) SetNATSPub(method NATSMethod, data NATSResponse) string {
// on workflows' scheduling. Messages must contain // on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos // workflow execution ID, to allow retrieval of execution infos
func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsTools NATSMethod, func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsTools NATSMethod,
function func(map[string]string), wg *sync.WaitGroup) { function func(NATSResponse), wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
ch := make(chan *nats.Msg, 64) ch := make(chan *nats.Msg, 64)
logger.Info().Msg("Listening to " + natsTools.GenerateKey()) logger.Info().Msg("Listening to " + natsTools.GenerateKey())
@@ -132,9 +131,9 @@ func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsT
defer subs.Unsubscribe() defer subs.Unsubscribe()
for msg := range ch { for msg := range ch {
map_mess := map[string]string{} var resp NATSResponse
json.Unmarshal(msg.Data, &map_mess) json.Unmarshal(msg.Data, &resp)
fmt.Println("Catching " + natsTools.String() + " workflow... " + map_mess["id"]) logger.Info().Msg("Catching " + natsTools.String() + "... " + resp.FromApp + " - " + resp.Datatype.String())
function(map_mess) function(resp)
} }
} }