Compare commits
2 Commits
d7b2ef6ae1
...
dc0041999d
| Author | SHA1 | Date | |
|---|---|---|---|
| dc0041999d | |||
| a653f9495b |
60
models/utils/change_bus.go
Normal file
60
models/utils/change_bus.go
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChangeEvent is fired whenever a DB object is created, updated or deleted
|
||||||
|
// within this process. Deleted=true means the object was removed; Object is
|
||||||
|
// the last known snapshot before deletion.
|
||||||
|
type ChangeEvent struct {
|
||||||
|
DataType tools.DataType
|
||||||
|
ID string
|
||||||
|
Object ShallowDBObject // nil only when the load after the write failed
|
||||||
|
Deleted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
changeBusMu sync.RWMutex
|
||||||
|
changeBus = map[tools.DataType][]chan ChangeEvent{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// SubscribeChanges returns a channel that receives ChangeEvents for dt
|
||||||
|
// whenever an object of that type is written or deleted in this process.
|
||||||
|
// Call the returned cancel function to unsubscribe; after that the channel
|
||||||
|
// will no longer receive events (it is not closed — use a context to stop
|
||||||
|
// reading).
|
||||||
|
func SubscribeChanges(dt tools.DataType) (<-chan ChangeEvent, func()) {
|
||||||
|
ch := make(chan ChangeEvent, 32)
|
||||||
|
changeBusMu.Lock()
|
||||||
|
changeBus[dt] = append(changeBus[dt], ch)
|
||||||
|
changeBusMu.Unlock()
|
||||||
|
return ch, func() {
|
||||||
|
changeBusMu.Lock()
|
||||||
|
subs := changeBus[dt]
|
||||||
|
for i, c := range subs {
|
||||||
|
if c == ch {
|
||||||
|
changeBus[dt] = append(subs[:i], subs[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
changeBusMu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyChange broadcasts a ChangeEvent to all current subscribers for dt.
|
||||||
|
// Non-blocking: events are dropped for subscribers whose buffer is full.
|
||||||
|
func NotifyChange(dt tools.DataType, id string, obj ShallowDBObject, deleted bool) {
|
||||||
|
changeBusMu.RLock()
|
||||||
|
subs := changeBus[dt]
|
||||||
|
changeBusMu.RUnlock()
|
||||||
|
evt := ChangeEvent{DataType: dt, ID: id, Object: obj, Deleted: deleted}
|
||||||
|
for _, ch := range subs {
|
||||||
|
select {
|
||||||
|
case ch <- evt:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -63,7 +63,11 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
|
|||||||
a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error())
|
a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
return a.LoadOne(id)
|
result, rcode, rerr := a.LoadOne(id)
|
||||||
|
if rerr == nil && result != nil {
|
||||||
|
go NotifyChange(a.GetType(), result.GetID(), result, false)
|
||||||
|
}
|
||||||
|
return result, rcode, rerr
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenericLoadOne loads one object from the database (generic)
|
// GenericLoadOne loads one object from the database (generic)
|
||||||
@@ -86,6 +90,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
|
|||||||
a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error())
|
a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
|
go NotifyChange(a.GetType(), id, res, true)
|
||||||
return res, 200, nil
|
return res, 200, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,7 +147,11 @@ func GenericUpdateOne(change map[string]interface{}, id string, a Accessor) (DBO
|
|||||||
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
|
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
return a.LoadOne(id)
|
result, rcode, rerr := a.LoadOne(id)
|
||||||
|
if rerr == nil && result != nil {
|
||||||
|
go NotifyChange(a.GetType(), result.GetID(), result, false)
|
||||||
|
}
|
||||||
|
return result, rcode, rerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) {
|
func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) {
|
||||||
@@ -210,7 +219,11 @@ func GenericRawUpdateOne(set DBObject, id string, a Accessor) (DBObject, int, er
|
|||||||
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
|
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
return a.LoadOne(id)
|
result, rcode, rerr := a.LoadOne(id)
|
||||||
|
if rerr == nil && result != nil {
|
||||||
|
go NotifyChange(a.GetType(), result.GetID(), result, false)
|
||||||
|
}
|
||||||
|
return result, rcode, rerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetMySelf(wfa Accessor) (ShallowDBObject, error) {
|
func GetMySelf(wfa Accessor) (ShallowDBObject, error) {
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTP Method Enum defines the different methods that can be used to interact with the HTTP server
|
// HTTP Method Enum defines the different methods that can be used to interact with the HTTP server
|
||||||
@@ -57,7 +58,8 @@ type HTTPCallerITF interface {
|
|||||||
type HTTPCaller struct {
|
type HTTPCaller struct {
|
||||||
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
|
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
|
||||||
Disabled bool // Disabled flag
|
Disabled bool // Disabled flag
|
||||||
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
|
Mu sync.RWMutex
|
||||||
|
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPCaller creates a new instance of the HTTP Caller
|
// NewHTTPCaller creates a new instance of the HTTP Caller
|
||||||
@@ -217,6 +219,8 @@ func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
|
func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
|
||||||
|
caller.Mu.Lock()
|
||||||
|
defer caller.Mu.Unlock()
|
||||||
caller.LastResults = make(map[string]interface{})
|
caller.LastResults = make(map[string]interface{})
|
||||||
caller.LastResults["header"] = resp.Header
|
caller.LastResults["header"] = resp.Header
|
||||||
caller.LastResults["code"] = resp.StatusCode
|
caller.LastResults["code"] = resp.StatusCode
|
||||||
|
|||||||
Reference in New Issue
Block a user