Set up -> Stream is Working

This commit is contained in:
mr
2026-02-03 15:25:15 +01:00
parent 0ff21c0818
commit 1c2ea9ca96
11 changed files with 259 additions and 179 deletions

View File

@@ -52,17 +52,17 @@ func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) {
func (ix *LongLivedStreamRecordedService[T]) gc() {
ix.StreamMU.Lock()
defer ix.StreamMU.Unlock()
now := time.Now()
streams := ix.StreamRecords[ProtocolHeartbeat]
if streams == nil {
now := time.Now().UTC()
if ix.StreamRecords[ProtocolHeartbeat] == nil {
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
return
}
streams := ix.StreamRecords[ProtocolHeartbeat]
for pid, rec := range streams {
if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) {
for _, sstreams := range ix.StreamRecords {
if sstreams[pid] != nil {
sstreams[pid].Stream.Close()
delete(sstreams, pid)
}
}
@@ -111,40 +111,49 @@ func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] {
}
func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) {
streams := ix.StreamRecords[ProtocolHeartbeat]
pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn)
if err != nil {
return
defer s.Close()
for {
pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn)
if err != nil {
continue
}
ix.StreamMU.Lock()
if ix.StreamRecords[ProtocolHeartbeat] == nil {
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
}
streams := ix.StreamRecords[ProtocolHeartbeat]
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Stream = s
rec.HeartbeatStream = hb.Stream
rec.LastSeen = time.Now().UTC()
} else {
streams[*pid] = &StreamRecord[T]{
DID: hb.DID,
HeartbeatStream: hb.Stream,
Stream: s,
LastSeen: time.Now().UTC(),
}
}
ix.StreamMU.Unlock()
}
ix.StreamMU.Lock()
defer ix.StreamMU.Unlock()
if streams == nil {
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
return
}
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Stream = s
rec.HeartbeatStream = hb.Stream
rec.LastSeen = time.Unix(hb.Timestamp, 0)
}
// si je l'handle et que je ne suis pas dans une
}
func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heartbeat, error) {
if len(h.Network().Peers()) >= maxNodes {
return nil, nil, fmt.Errorf("too many connections, try another indexer")
}
defer s.Close()
var hb Heartbeat
if err := json.NewDecoder(s).Decode(&hb); err != nil {
return nil, nil, err
}
pid, err := pp.Decode(hb.PeerID)
hb.Stream.Stream = s // here is the long-lived bidirectionnal heart bit.
hb.Stream = &Stream{
DID: hb.DID,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
} // here is the long-lived bidirectionnal heart bit.
return &pid, &hb, err
}
@@ -242,7 +251,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
StaticIndexers = append(StaticIndexers, ad)
// make a privilege streams with indexer.
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, nil)
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, true, nil)
}
}
if len(StaticIndexers) == 0 {
@@ -252,21 +261,21 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
if len(StaticIndexers) < minIndexer {
// TODO : ask for unknown indexer.
}
SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20) // your indexer is just like a node for the next indexer.
SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
}
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, onStreamCreated *func(network.Stream)) ProtocolStream {
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
if onStreamCreated == nil {
f := func(s network.Stream) {
protoS[proto][id] = &Stream{
Stream: s,
Expiry: time.Now().Add(2 * time.Minute),
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
}
onStreamCreated = &f
}
f := *onStreamCreated
if mypid > id {
if mypid > id || force {
if ctx == nil {
c := context.Background()
ctx = &c
@@ -278,16 +287,14 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host,
if protoS[proto][id] != nil {
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
} else {
fmt.Println("GENERATE STREAM", proto, id)
s, err := h.NewStream(*ctx, id, proto)
if err != nil {
panic(err.Error())
}
f(s)
}
} else {
h.SetStreamHandler(proto, f)
}
return protoS
}
@@ -318,7 +325,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps Proto
hb := Heartbeat{
DID: peerID,
PeerID: h.ID().String(),
Timestamp: time.Now().Unix(),
Timestamp: time.Now().UTC().Unix(),
}
for _, ix := range peers {
_ = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second)
@@ -330,7 +337,8 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps Proto
}()
}
func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) error {
func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo,
hb Heartbeat, ps ProtocolStream, interval time.Duration) error {
streams := ps.Get(proto)
if len(streams) == 0 {
return errors.New("no stream for protocol heartbeat founded")