package main import ( "fmt" "net/http" "sync" ) // event — одно событие, отдаваемое подписчикам через SSE. // Type становится `event:` строкой, Data — `data:`. type event struct { Type string Data string } // eventBus — простой fan-out для SSE. Подписчик создаётся в момент // открытия GET /api/events и живёт до закрытия соединения. type eventBus struct { mu sync.Mutex subscribers map[chan event]struct{} } func newEventBus() *eventBus { return &eventBus{subscribers: make(map[chan event]struct{})} } func (b *eventBus) subscribe() chan event { ch := make(chan event, 64) b.mu.Lock() b.subscribers[ch] = struct{}{} b.mu.Unlock() return ch } func (b *eventBus) unsubscribe(ch chan event) { b.mu.Lock() delete(b.subscribers, ch) close(ch) b.mu.Unlock() } func (b *eventBus) publish(e event) { b.mu.Lock() defer b.mu.Unlock() for ch := range b.subscribers { select { case ch <- e: default: // Подписчик отстаёт — пропускаем (UI догонится снапшотом по GET /api/state) } } } // handleSSE — GET /api/events. Держит соединение, в каждом событии // отдаёт event: \ndata: \n\n. func (s *server) handleSSE(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.WriteHeader(http.StatusOK) ch := s.state.bus.subscribe() defer s.state.bus.unsubscribe(ch) // сразу шлём snapshot, чтобы UI догнал состояние snap := s.state.Snapshot() fmt.Fprintf(w, "event: snapshot\ndata: %s\n\n", mustJSON(snap)) flusher.Flush() for { select { case <-r.Context().Done(): return case e := <-ch: if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", e.Type, e.Data); err != nil { return } flusher.Flush() } } }