2 Commits

Author SHA1 Message Date
mr
dc0041999d Change bus 2026-04-14 12:46:22 +02:00
mr
a653f9495b lock caller 2026-04-13 15:08:06 +02:00
3 changed files with 81 additions and 4 deletions

View File

@@ -0,0 +1,60 @@
package utils
import (
"sync"
"cloud.o-forge.io/core/oc-lib/tools"
)
// ChangeEvent is fired whenever a DB object is created, updated or deleted
// within this process. Deleted=true means the object was removed; Object is
// the last known snapshot before deletion.
type ChangeEvent struct {
DataType tools.DataType
ID string
Object ShallowDBObject // nil only when the load after the write failed
Deleted bool
}
var (
changeBusMu sync.RWMutex
changeBus = map[tools.DataType][]chan ChangeEvent{}
)
// SubscribeChanges returns a channel that receives ChangeEvents for dt
// whenever an object of that type is written or deleted in this process.
// Call the returned cancel function to unsubscribe; after that the channel
// will no longer receive events (it is not closed — use a context to stop
// reading).
func SubscribeChanges(dt tools.DataType) (<-chan ChangeEvent, func()) {
ch := make(chan ChangeEvent, 32)
changeBusMu.Lock()
changeBus[dt] = append(changeBus[dt], ch)
changeBusMu.Unlock()
return ch, func() {
changeBusMu.Lock()
subs := changeBus[dt]
for i, c := range subs {
if c == ch {
changeBus[dt] = append(subs[:i], subs[i+1:]...)
break
}
}
changeBusMu.Unlock()
}
}
// NotifyChange broadcasts a ChangeEvent to all current subscribers for dt.
// Non-blocking: events are dropped for subscribers whose buffer is full.
func NotifyChange(dt tools.DataType, id string, obj ShallowDBObject, deleted bool) {
changeBusMu.RLock()
subs := changeBus[dt]
changeBusMu.RUnlock()
evt := ChangeEvent{DataType: dt, ID: id, Object: obj, Deleted: deleted}
for _, ch := range subs {
select {
case ch <- evt:
default:
}
}
}

View File

@@ -63,7 +63,11 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error()) a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
return a.LoadOne(id) result, rcode, rerr := a.LoadOne(id)
if rerr == nil && result != nil {
go NotifyChange(a.GetType(), result.GetID(), result, false)
}
return result, rcode, rerr
} }
// GenericLoadOne loads one object from the database (generic) // GenericLoadOne loads one object from the database (generic)
@@ -86,6 +90,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error()) a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
go NotifyChange(a.GetType(), id, res, true)
return res, 200, nil return res, 200, nil
} }
@@ -142,7 +147,11 @@ func GenericUpdateOne(change map[string]interface{}, id string, a Accessor) (DBO
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error()) a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
return a.LoadOne(id) result, rcode, rerr := a.LoadOne(id)
if rerr == nil && result != nil {
go NotifyChange(a.GetType(), result.GetID(), result, false)
}
return result, rcode, rerr
} }
func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) { func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) {
@@ -210,7 +219,11 @@ func GenericRawUpdateOne(set DBObject, id string, a Accessor) (DBObject, int, er
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error()) a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
return a.LoadOne(id) result, rcode, rerr := a.LoadOne(id)
if rerr == nil && result != nil {
go NotifyChange(a.GetType(), result.GetID(), result, false)
}
return result, rcode, rerr
} }
func GetMySelf(wfa Accessor) (ShallowDBObject, error) { func GetMySelf(wfa Accessor) (ShallowDBObject, error) {

View File

@@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"sync"
) )
// HTTP Method Enum defines the different methods that can be used to interact with the HTTP server // HTTP Method Enum defines the different methods that can be used to interact with the HTTP server
@@ -57,7 +58,8 @@ type HTTPCallerITF interface {
type HTTPCaller struct { type HTTPCaller struct {
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
Disabled bool // Disabled flag Disabled bool // Disabled flag
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type Mu sync.RWMutex
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
} }
// NewHTTPCaller creates a new instance of the HTTP Caller // NewHTTPCaller creates a new instance of the HTTP Caller
@@ -217,6 +219,8 @@ func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
} }
func (caller *HTTPCaller) StoreResp(resp *http.Response) error { func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
caller.Mu.Lock()
defer caller.Mu.Unlock()
caller.LastResults = make(map[string]interface{}) caller.LastResults = make(map[string]interface{})
caller.LastResults["header"] = resp.Header caller.LastResults["header"] = resp.Header
caller.LastResults["code"] = resp.StatusCode caller.LastResults["code"] = resp.StatusCode