Recipes

Real-time with WebSocket

The framework's pkg/http package includes a zero-dependency WebSocket implementation (RFC 6455). The standalone app uses it for two real-time features: live log streaming and instant notification delivery. This recipe shows how both work and how to build your own real-time endpoints.


Basic WebSocket endpoint

Register a WebSocket endpoint like any other handler. The Upgrader handles the HTTP-to-WebSocket handshake:

upgrader := http.Upgrader{}

func streamHandler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r)
    if err != nil {
        return // Upgrade writes the error response
    }
    defer conn.Close()

    // Reader goroutine detects client disconnect
    done := make(chan struct{})
    go func() {
        defer close(done)
        for {
            _, _, err := conn.ReadMessage()
            if err != nil {
                return
            }
        }
    }()

    // Send events until disconnect
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-done:
            return
        case <-ticker.C:
            conn.WritePing(nil) // Keep-alive
        }
    }
}

The pattern is always the same: upgrade, spawn a reader goroutine to detect disconnection, then loop sending events with periodic pings for keep-alive.


Pattern 1: Log streaming

The admin panel's log viewer streams new log entries in real-time via GET /api/admin/logs/stream.

How it works

  1. Opens the current log file and seeks to the end
  2. Polls every 300ms for new lines via bufio.Reader
  3. Parses each line as JSON, applies filters, and sends matching entries
  4. Client can update filters mid-stream by sending a JSON message
func streamHandler(logsDir string) func(http.ResponseWriter, *http.Request) {
    upgrader := http.Upgrader{}

    return func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r)
        if err != nil {
            return
        }
        defer conn.Close()

        // Initial filters from query params
        level := r.URL.Query().Get("level")
        search := r.URL.Query().Get("search")

        // Open log file and seek to end
        logPath := filepath.Join(logsDir, "stanza.log")
        f, err := os.Open(logPath)
        if err != nil {
            conn.CloseWithMessage(http.CloseGoingAway, "log file unavailable")
            return
        }
        defer f.Close()
        f.Seek(0, io.SeekEnd)
        reader := bufio.NewReader(f)

        // Reader goroutine — handles disconnection and filter updates
        type filterUpdate struct {
            Level  string `json:"level"`
            Search string `json:"search"`
        }
        filterCh := make(chan filterUpdate, 1)
        done := make(chan struct{})
        go func() {
            defer close(done)
            for {
                _, data, err := conn.ReadMessage()
                if err != nil {
                    return
                }
                var fu filterUpdate
                if json.Unmarshal(data, &fu) == nil {
                    select {
                    case filterCh <- fu:
                    default:
                    }
                }
            }
        }()

        // Tail loop
        poll := time.NewTicker(300 * time.Millisecond)
        defer poll.Stop()
        ping := time.NewTicker(30 * time.Second)
        defer ping.Stop()

        for {
            select {
            case <-done:
                return
            case fu := <-filterCh:
                level = fu.Level
                search = fu.Search
            case <-ping.C:
                conn.WritePing(nil)
            case <-poll.C:
                for {
                    line, err := reader.ReadString('\n')
                    if err != nil {
                        break
                    }
                    var entry map[string]any
                    if json.Unmarshal([]byte(line), &entry) != nil {
                        continue
                    }
                    // Apply filters
                    if level != "" {
                        if entryLevel, _ := entry["level"].(string); entryLevel != level {
                            continue
                        }
                    }
                    if search != "" && !matchesSearch(entry, search) {
                        continue
                    }
                    conn.WriteMessage(http.TextMessage, []byte(strings.TrimSpace(line)))
                }
            }
        }
    }
}

Why 300ms polling instead of fsnotify

Polling at 300ms is simpler and more reliable than file system notifications. It works on all operating systems including containers, has zero external dependencies, and the latency is imperceptible for log tailing. The CPU cost is negligible — it's a single ReadString call that returns immediately when there's nothing new.

Client-side filter updates

The client can change filters without reconnecting by sending a JSON message:

{"level": "error", "search": "timeout"}

The server picks up the update on the next poll cycle. This avoids the overhead of tearing down and re-establishing the WebSocket connection.


Pattern 2: Notification pub/sub

Admin notifications are delivered instantly via GET /api/admin/notifications/stream. This uses an in-memory pub/sub Hub.

The Hub

type Event struct {
    Type         string        `json:"type"`
    Notification *Notification `json:"notification,omitempty"`
    UnreadCount  int           `json:"unread_count"`
}

type Hub struct {
    mu          sync.Mutex
    subscribers map[int64][]*subscriber
}

type subscriber struct {
    ch chan Event
}

func NewHub() *Hub {
    return &Hub{subscribers: make(map[int64][]*subscriber)}
}
  • Subscribe(adminID) — returns a receive-only event channel and an unsubscribe function
  • Publish(adminID, event) — sends to all subscribers for that admin (non-blocking)
  • PublishAll(event) — broadcasts to every connected subscriber

The channel is buffered at 16 events. Sends are non-blocking — if a subscriber's buffer is full, the event is dropped rather than blocking the publisher.

Publishing on notification creation

The Service.NotifyAdmin method automatically publishes to the Hub after inserting the notification:

func (s *Service) NotifyAdmin(adminID int64, notifType, title, message string, opts ...Option) {
    // Insert notification into database
    notifications.Notify(s.db, EntityAdmin, fmt.Sprintf("%d", adminID),
        notifType, title, message, "")

    // Publish to WebSocket subscribers
    count := notifications.UnreadCount(s.db, EntityAdmin, fmt.Sprintf("%d", adminID))
    s.hub.Publish(adminID, Event{
        Type:        "notification",
        Notification: &Notification{/* ... */},
        UnreadCount: count,
    })
}

WebSocket stream endpoint

func streamHandler(svc *notifications.Service) func(http.ResponseWriter, *http.Request) {
    upgrader := http.Upgrader{}

    return func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r)
        if err != nil {
            return
        }
        defer conn.Close()

        adminID := getAdminID(r) // From JWT middleware

        // Subscribe to events for this admin
        events, unsub := svc.Hub().Subscribe(adminID)
        defer unsub()

        // Send initial unread count
        count := notifications.UnreadCount(svc.DB(), "admin", fmt.Sprintf("%d", adminID))
        initial, _ := json.Marshal(Event{Type: "unread_count", UnreadCount: count})
        conn.WriteMessage(http.TextMessage, initial)

        // Reader goroutine
        done := make(chan struct{})
        go func() {
            defer close(done)
            for {
                _, _, err := conn.ReadMessage()
                if err != nil {
                    return
                }
            }
        }()

        // Stream events
        ping := time.NewTicker(30 * time.Second)
        defer ping.Stop()

        for {
            select {
            case <-done:
                return
            case evt := <-events:
                data, _ := json.Marshal(evt)
                conn.WriteMessage(http.TextMessage, data)
            case <-ping.C:
                conn.WritePing(nil)
            }
        }
    }
}

Event types

The stream sends two types of events:

Event typeWhenPayload
unread_countOn initial connection{"type":"unread_count","unread_count":3}
notificationWhen a notification is created{"type":"notification","notification":{...},"unread_count":4}

Each event includes the updated unread_count so the client never needs a separate HTTP call to stay in sync.


Building your own real-time endpoint

Follow this checklist when adding a new WebSocket endpoint:

  1. Create the upgrader — default http.Upgrader{} works for same-origin. Set CheckOrigin if you need cross-origin WebSocket.

  2. Authenticate — protect the endpoint with the same middleware as your HTTP routes. JWT authentication happens on the initial HTTP request before the upgrade.

  3. Reader goroutine — always spawn one. Even if you don't expect client messages, the reader detects disconnection. Close a done channel when it returns.

  4. Ping heartbeat — send a ping every 30 seconds. Without it, dead connections (client crashed, network dropped) won't be detected until the next write fails.

  5. Non-blocking sends — if your event source can outpace the client, use a buffered channel with select { case ch <- event: default: } to drop events rather than blocking.

  6. Clean shutdown — defer conn.Close() and unsub() (if using pub/sub). The deferred close sends a close frame to the client.


Admin panel integration

Notification bell

The notification bell connects via WebSocket on mount:

  • Receives real-time events and updates the unread badge instantly
  • Falls back to 30-second HTTP polling when WebSocket fails
  • Auto-reconnects after 5 seconds on disconnect
  • Reconnects when the tab becomes visible after being hidden
  • Shows a Wifi/WifiOff indicator for connection status

Log viewer

The log viewer uses WebSocket for the current log file:

  • "Live" mode streams new entries via WebSocket
  • Falls back to HTTP polling for rotated log files
  • Shows a streaming status indicator (connected/connecting/disconnected)
  • Filter changes are sent to the server mid-stream without reconnecting
  • Caps display at 500 entries to prevent memory growth

Tips

  • 300ms polling for file tailing. Simpler and more portable than fsnotify. The latency is not noticeable for log streaming.
  • Hub lives on the Service, not globally. The notification Hub is created inside NewService() and accessed via svc.Hub(). This keeps it testable and avoids global state.
  • Buffer size of 16. The subscriber channel buffer (16 events) handles normal bursts. If notifications arrive faster than the WebSocket can flush, events are dropped — this is intentional. The client can always fetch missed notifications via the HTTP API.
  • One reader + one writer. The framework's Conn is safe for exactly this pattern. Don't share a Conn across multiple writer goroutines without your own mutex.
  • Middleware chain compatibility. The framework's middleware wrappers all implement Unwrap(), so WebSocket upgrade works through any middleware stack. No special ordering needed.
Previous
Roles & scopes