diff --git a/tools/api.go b/tools/api.go index 7bffe81..bbe0a69 100644 --- a/tools/api.go +++ b/tools/api.go @@ -61,10 +61,8 @@ func (s State) String() string { type API struct{} func (a *API) Discovered(infos []*beego.ControllerInfo) { - respondToDiscovery := func(m map[string]string) { - if len(m) == 0 { - a.SubscribeRouter(infos) - } + respondToDiscovery := func(m NATSResponse) { + a.SubscribeRouter(infos) } a.ListenRouter(respondToDiscovery) a.SubscribeRouter(infos) @@ -91,8 +89,8 @@ func (a *API) GetState() (State, int, error) { return ALIVE, 200, nil // If everything is up, return alive } -func (a *API) ListenRouter(exec func(msg map[string]string)) { - go NewNATSCaller().ListenNats(map[NATSMethod]func(msg map[string]string){ +func (a *API) ListenRouter(exec func(msg NATSResponse)) { + go NewNATSCaller().ListenNats(map[NATSMethod]func(msg NATSResponse){ DISCOVERY: exec, }) } @@ -113,7 +111,14 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) { } } } - go nats.SetNATSPub(DISCOVERY, discovery) + b, _ := json.Marshal(discovery) + + go nats.SetNATSPub(DISCOVERY, NATSResponse{ + FromApp: beego.AppPath, + Datatype: -1, + Method: int(DISCOVERY), + Payload: b, + }) } // CheckRemotePeer checks the state of a remote peer diff --git a/tools/nats_caller.go b/tools/nats_caller.go index b5ba946..549dcf1 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -2,7 +2,6 @@ package tools import ( "encoding/json" - "fmt" "strings" "sync" "time" @@ -67,7 +66,7 @@ func NewNATSCaller() *natsCaller { // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos -func (s *natsCaller) ListenNats(execs map[NATSMethod]func(map[string]string)) { +func (s *natsCaller) ListenNats(execs map[NATSMethod]func(NATSResponse)) { log := logs.GetLogger() if config.GetConfig().NATSUrl == "" { log.Error().Msg(" -> NATS_SERVER is not set") @@ -121,7 +120,7 @@ func (o *natsCaller) SetNATSPub(method NATSMethod, data NATSResponse) string { // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsTools NATSMethod, - function func(map[string]string), wg *sync.WaitGroup) { + function func(NATSResponse), wg *sync.WaitGroup) { defer wg.Done() ch := make(chan *nats.Msg, 64) logger.Info().Msg("Listening to " + natsTools.GenerateKey()) @@ -132,9 +131,9 @@ func (o *natsCaller) listenForChange(logger zerolog.Logger, nc *nats.Conn, natsT defer subs.Unsubscribe() for msg := range ch { - map_mess := map[string]string{} - json.Unmarshal(msg.Data, &map_mess) - fmt.Println("Catching " + natsTools.String() + " workflow... " + map_mess["id"]) - function(map_mess) + var resp NATSResponse + json.Unmarshal(msg.Data, &resp) + logger.Info().Msg("Catching " + natsTools.String() + "... " + resp.FromApp + " - " + resp.Datatype.String()) + function(resp) } }