2024-07-11 11:40:11 +02:00
|
|
|
// @APIVersion 1.0.0
|
2024-10-15 10:51:12 +02:00
|
|
|
// @Title oc-peer
|
2024-09-04 17:23:30 +02:00
|
|
|
// @Description Manage OpenCloud peers
|
|
|
|
|
// @Contact admin@o-cloud.io
|
|
|
|
|
// @TermsOfServiceUrl http://cloud.o-forge.io/
|
2024-10-03 09:50:29 +02:00
|
|
|
// @License AGPL
|
|
|
|
|
// @LicenseUrl https://www.gnu.org/licenses/agpl-3.0.html
|
2024-07-11 11:40:11 +02:00
|
|
|
package routers
|
|
|
|
|
|
|
|
|
|
import (
|
2026-03-04 11:58:13 +01:00
|
|
|
"encoding/json"
|
2026-04-29 11:43:52 +02:00
|
|
|
"fmt"
|
2026-03-04 11:58:13 +01:00
|
|
|
"net/http"
|
2024-10-15 10:51:12 +02:00
|
|
|
"oc-peer/controllers"
|
2026-03-04 11:58:13 +01:00
|
|
|
"oc-peer/infrastructure"
|
|
|
|
|
"strings"
|
2026-04-29 11:43:52 +02:00
|
|
|
"time"
|
2024-07-11 11:40:11 +02:00
|
|
|
|
2026-03-04 11:58:13 +01:00
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
2024-07-11 11:40:11 +02:00
|
|
|
beego "github.com/beego/beego/v2/server/web"
|
2026-04-01 11:27:05 +02:00
|
|
|
"github.com/gorilla/websocket"
|
2024-07-11 11:40:11 +02:00
|
|
|
)
|
|
|
|
|
|
2026-04-01 11:27:05 +02:00
|
|
|
var upgrader = websocket.Upgrader{
|
|
|
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
|
|
|
return true
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-11 11:40:11 +02:00
|
|
|
func init() {
|
2026-04-01 11:27:05 +02:00
|
|
|
ns := beego.NewNamespace("/oc",
|
2024-08-26 12:07:54 +02:00
|
|
|
beego.NSNamespace("/status",
|
2024-08-06 11:09:38 +02:00
|
|
|
beego.NSInclude(
|
2024-08-21 16:25:24 +02:00
|
|
|
&controllers.StatusController{},
|
|
|
|
|
),
|
|
|
|
|
),
|
2024-10-15 10:51:12 +02:00
|
|
|
beego.NSInclude(
|
|
|
|
|
&controllers.PeerController{},
|
2024-08-06 11:09:38 +02:00
|
|
|
),
|
2024-07-11 11:40:11 +02:00
|
|
|
beego.NSNamespace("/version",
|
|
|
|
|
beego.NSInclude(
|
|
|
|
|
&controllers.VersionController{},
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
beego.AddNamespace(ns)
|
2026-03-04 11:58:13 +01:00
|
|
|
|
2026-04-29 11:43:52 +02:00
|
|
|
// WebSocket — peer search (returns found peers + online/offline updates for them)
|
2026-03-04 11:58:13 +01:00
|
|
|
beego.Handler("/oc/decentralized/search/:search", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
2026-04-01 11:27:05 +02:00
|
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
defer conn.Close()
|
2026-03-04 11:58:13 +01:00
|
|
|
parts := strings.Split(strings.TrimSuffix(r.URL.Path, "/"), "/")
|
|
|
|
|
search := parts[len(parts)-1]
|
|
|
|
|
|
2026-04-29 11:43:52 +02:00
|
|
|
user, _, groups := oclib.ExtractTokenInfoWs(*r)
|
2026-03-04 11:58:13 +01:00
|
|
|
b, _ := json.Marshal(map[string]string{"search": search})
|
2026-03-12 09:10:14 +01:00
|
|
|
infrastructure.EmitNATS(user, groups, tools.PropalgationMessage{
|
2026-03-04 11:58:13 +01:00
|
|
|
Action: tools.PB_SEARCH,
|
|
|
|
|
DataType: tools.PEER.EnumIndex(),
|
|
|
|
|
Payload: b,
|
|
|
|
|
})
|
2026-04-29 11:43:52 +02:00
|
|
|
fmt.Println("SEARCH", search)
|
2026-04-01 11:27:05 +02:00
|
|
|
controllers.Websocket(r.Context(), user, conn)
|
2026-03-04 11:58:13 +01:00
|
|
|
}))
|
2026-04-29 11:43:52 +02:00
|
|
|
|
|
|
|
|
// WebSocket — dedicated online/offline feed, user-scoped.
|
|
|
|
|
//
|
|
|
|
|
// The frontend sends lists of ShallowPeer to watch; oc-peer forwards them
|
|
|
|
|
// to oc-discovery via NATS (with user identity). On any disconnect or error
|
|
|
|
|
// oc-peer emits NATS close for all peers watched in this session.
|
|
|
|
|
beego.Handler("/oc/decentralized/observe", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
user, _, _ := oclib.ExtractTokenInfoWs(*r)
|
|
|
|
|
connID := fmt.Sprintf("%p", conn)
|
|
|
|
|
ch := make(chan infrastructure.WSMessage, 64)
|
|
|
|
|
fmt.Println("SUBSCRIBED TO", connID)
|
|
|
|
|
infrastructure.RegisterObserveSession(connID, user, ch)
|
|
|
|
|
done := make(chan struct{})
|
|
|
|
|
defer func() {
|
|
|
|
|
infrastructure.CloseObserveSession(connID)
|
|
|
|
|
close(done)
|
|
|
|
|
conn.Close()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
const pongWait = 60 * time.Second
|
|
|
|
|
const pingPeriod = 50 * time.Second
|
|
|
|
|
const writeWait = 10 * time.Second
|
|
|
|
|
|
|
|
|
|
conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
|
|
|
conn.SetPongHandler(func(string) error {
|
|
|
|
|
return conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Write loop: forward state-change messages and send keepalive pings.
|
|
|
|
|
go func() {
|
|
|
|
|
ticker := time.NewTicker(pingPeriod)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case msg := <-ch:
|
|
|
|
|
fmt.Println("ONLINE", msg)
|
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
|
|
if conn.WriteJSON(msg) != nil {
|
2026-04-30 14:17:47 +02:00
|
|
|
infrastructure.CloseObserveSession(connID)
|
2026-04-29 11:43:52 +02:00
|
|
|
conn.Close()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
|
|
if conn.WriteMessage(websocket.PingMessage, nil) != nil {
|
2026-04-30 14:17:47 +02:00
|
|
|
infrastructure.CloseObserveSession(connID)
|
2026-04-29 11:43:52 +02:00
|
|
|
conn.Close()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
case <-done:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Read loop: receive peer lists from the frontend.
|
|
|
|
|
var req struct {
|
|
|
|
|
Peers []infrastructure.ShallowPeer `json:"peers"`
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
if err := conn.ReadJSON(&req); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// AddObservedPeers deduplicates, emits NATS observe, returns snapshot.
|
|
|
|
|
for _, msg := range infrastructure.AddObservedPeers(connID, req.Peers) {
|
|
|
|
|
select {
|
|
|
|
|
case ch <- msg:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}))
|
2024-07-11 11:40:11 +02:00
|
|
|
}
|