🔥 feat: add lightweight SSE middleware#4239
Conversation
Add a small Fiber-native Server-Sent Events middleware focused on the transport layer: SSE headers, frame formatting, flushing, heartbeat comments, Last-Event-ID access, stream lifecycle context, and disconnect detection through write/flush errors. The implementation intentionally avoids bundling hub, topic routing, replay storage, auth helpers, metrics, or pub/sub bridges into core. Those remain application-level concerns and can be covered by recipes or separate packages. Also add middleware docs, focused tests, and an implementation plan that captures the review history and scope decisions.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a new SSE middleware package ( Changes
Sequence DiagramsequenceDiagram
participant Client
participant Fiber as "Fiber Handler"
participant Stream
participant Heartbeat as "Heartbeat Ticker"
participant OnClose as "OnClose Callback"
Client->>Fiber: HTTP request (may include Last-Event-ID)
Fiber->>Fiber: set SSE headers, abandon request
Fiber->>Stream: create Stream (context tied to request)
Fiber->>Heartbeat: start ticker if enabled
Fiber->>Stream: invoke user Handler(c, stream)
loop Handler writes events/comments
Stream->>Stream: serialize (id,event,data,retry/comment) sanitize
Stream->>Client: flush frame
alt flush succeeds
Client-->>Stream: receives frame
else flush fails / client disconnects
Stream->>Stream: latch error, cancel context, close Done
Stream-->>Handler: subsequent writes return stream-closed error
end
end
alt handler returns or stream closed
Stream->>OnClose: invoke OnClose(c, stream.Err())
Heartbeat->>Heartbeat: stop ticker
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4239 +/- ##
==========================================
+ Coverage 91.17% 91.24% +0.06%
==========================================
Files 123 126 +3
Lines 12084 12296 +212
==========================================
+ Hits 11018 11219 +201
- Misses 668 675 +7
- Partials 398 402 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds a new lightweight middleware/sse package to provide Fiber-native Server-Sent Events (SSE) streaming primitives (headers, framing, flushing, heartbeats, disconnect detection) along with documentation and tests.
Changes:
- Introduces SSE middleware (
New(Config)) andStreamAPI for event/comment/retry writes with per-write flushing and lifecycle cancellation. - Adds SSE wire-format helpers (
Event, field sanitization, JSON encoding behavior, comment formatting). - Documents the new middleware and announces it in the v3 “What’s New” guide.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| middleware/sse/sse.go | Adds the Fiber middleware entrypoint and Stream lifecycle/write/heartbeat logic. |
| middleware/sse/event.go | Implements SSE frame formatting, sanitization, and JSON encoding for event data. |
| middleware/sse/config.go | Defines middleware configuration and defaults (retry + heartbeat behavior). |
| middleware/sse/sse_test.go | Adds unit/integration tests for framing, headers, heartbeats, concurrency, and error handling. |
| docs/whats_new.md | Adds SSE to the v3 change overview and introduces the new middleware section. |
| docs/middleware/sse.md | Adds middleware documentation, examples, and config/stream API reference. |
There was a problem hiding this comment.
Code Review
This pull request introduces a new Server-Sent Events (SSE) middleware for Fiber v3, providing essential transport features such as event formatting, heartbeats, and disconnect detection. The implementation includes a Stream object for handling active connections and a configurable middleware handler. Documentation and unit tests are provided. A review comment pointed out a terminology inaccuracy in the documentation where a context was described as 'closed' instead of 'canceled', providing a suggestion to fix it.
There was a problem hiding this comment.
🧹 Nitpick comments (5)
middleware/sse/sse.go (1)
194-220: Optional: hardenstopHeartbeatagainst double-invocation.The returned closure performs
close(stop), which panics if called twice. Today it's only invoked from a single deferred call site, so this is fine — but async.Once(or already-closed sentinel) would make the helper resilient against future refactors that might call it from multiple places.♻️ Proposed refactor
func (s *Stream) startHeartbeat(interval time.Duration) func() { if interval <= 0 { return nil } stop := make(chan struct{}) go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: if err := s.Comment(""); err != nil { return } case <-stop: return case <-s.Done(): return } } }() - return func() { - close(stop) - } + var once sync.Once + return func() { + once.Do(func() { close(stop) }) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/sse.go` around lines 194 - 220, The stop closure returned by startHeartbeat currently calls close(stop) which will panic if invoked more than once; modify startHeartbeat to make the stop operation idempotent (e.g., wrap the close(stop) call with a sync.Once or check an already-closed sentinel) so calling the returned function multiple times is safe; update the returned closure in startHeartbeat to use that sync.Once (or atomic/boolean guard) to ensure close(stop) executes only once while leaving the existing ticker stop and goroutine exit behavior unchanged.middleware/sse/sse_test.go (2)
126-147: Potentially flaky timing on slow CI.
HeartbeatInterval: 10 * time.Millisecondwith a 30ms handler window leaves only ~2 expected ticks. On a slow/contended CI runner (or a test binary started under-race), the first tick may arrive after the handler returns, leaving the response body without:\n\nand failing the assertion. Consider widening the window (e.g., 5–10× interval) or polling for the heartbeat marker.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/sse_test.go` around lines 126 - 147, The test Test_SSE_NewWritesHeartbeat is flaky because HeartbeatInterval is 10ms but the Handler only waits 30ms, giving very few ticks on slow CI; update the test to either increase the handler wait to at least 5–10× HeartbeatInterval (e.g., set the sleep inside Handler to 50–100ms) or implement a small polling loop that reads resp.Body until the heartbeat marker ":\n\n" appears (retrying for a short timeout) to reliably detect the heartbeat; reference the Test_SSE_NewWritesHeartbeat function, New(Config{HeartbeatInterval: ... , Handler: ...}) and stream.Done()/stream.Err() when making the change.
268-272: Minor: add a timeout around the<-closedreceive to avoid hanging the suite on regression.If the middleware ever stops invoking
OnClose, this test will deadlock indefinitely instead of failing fast. Aselectwith a deadline is consistent with the timeout patterns already used elsewhere in this file (e.g.,Test_SSE_StreamContextCanceledOnClose,Test_SSE_StreamWriteError).♻️ Proposed change
resp, err := app.Test(httptest.NewRequest(fiber.MethodGet, "/events", http.NoBody)) require.NoError(t, err) require.Equal(t, fiber.StatusOK, resp.StatusCode) - require.ErrorIs(t, <-closed, handlerErr) + select { + case err := <-closed: + require.ErrorIs(t, err, handlerErr) + case <-time.After(time.Second): + t.Fatal("OnClose was not invoked") + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/sse_test.go` around lines 268 - 272, Test can hang waiting on the closed channel; replace the direct receive "require.ErrorIs(t, <-closed, handlerErr)" with a select that waits for either a value from the "closed" channel (asserting ErrorIs against "handlerErr") or a time.After timeout (failing the test fast). Update imports to include "time" and use the same timeout duration pattern already used in other tests (e.g., Test_SSE_StreamContextCanceledOnClose) so the test fails instead of deadlocking.docs/middleware/sse.md (1)
117-117: Nit: "Context() are closed" — contexts are canceled, not closed.Suggest tweaking the wording for accuracy with Go's context vocabulary.
📝 Proposed wording
-Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` and `Context()` are closed while `Err()` remains `nil`; writes after that return `sse: stream closed`. +Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` is closed and `Context()` is canceled while `Err()` remains `nil`; writes after that return `sse: stream closed`.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/middleware/sse.md` at line 117, Update the sentence that currently says "After a normal handler return, `Done` and `Context()` are closed while `Err()` remains `nil`;" to use correct Go context terminology: indicate that `Done` is closed but `Context()` is canceled (not closed) while `Err()` remains `nil`. Reference the symbols `Done`, `Context()`, and `Err()` so readers see the exact change.middleware/sse/event.go (1)
89-106: Minor: untypednilDataproduces adata: \nline, unlike typednil.When
Event.Datais untypednil,eventDatareturns"", andwriteDatastill emits a singledata: \nline (becausestrings.SplitSeq("", "\n")yields one empty element). For a typed-nil pointer (the case covered byTest_SSE_EventJSONEncodesTypedNilStringer) it falls through tojson.Marshaland emitsdata: null. Consider documenting this asymmetry or treatingnilas "no data line" for parity. Not a correctness issue, just surprising for callers who passData: nilexpecting eitherdata: nullor no data line at all.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/event.go` around lines 89 - 106, eventData currently returns an empty string for an untyped nil which causes writeData to emit a spurious empty "data: \n" line; change eventData to signal "no data" (e.g. change signature to return (string, bool, error) where the bool indicates whether there is data) and update writeData to skip emitting any data lines when that bool is false; keep handling for typed nils (they should still be marshaled to "null") and ensure Event.Data is the referenced symbol to locate callers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docs/middleware/sse.md`:
- Line 117: Update the sentence that currently says "After a normal handler
return, `Done` and `Context()` are closed while `Err()` remains `nil`;" to use
correct Go context terminology: indicate that `Done` is closed but `Context()`
is canceled (not closed) while `Err()` remains `nil`. Reference the symbols
`Done`, `Context()`, and `Err()` so readers see the exact change.
In `@middleware/sse/event.go`:
- Around line 89-106: eventData currently returns an empty string for an untyped
nil which causes writeData to emit a spurious empty "data: \n" line; change
eventData to signal "no data" (e.g. change signature to return (string, bool,
error) where the bool indicates whether there is data) and update writeData to
skip emitting any data lines when that bool is false; keep handling for typed
nils (they should still be marshaled to "null") and ensure Event.Data is the
referenced symbol to locate callers.
In `@middleware/sse/sse_test.go`:
- Around line 126-147: The test Test_SSE_NewWritesHeartbeat is flaky because
HeartbeatInterval is 10ms but the Handler only waits 30ms, giving very few ticks
on slow CI; update the test to either increase the handler wait to at least
5–10× HeartbeatInterval (e.g., set the sleep inside Handler to 50–100ms) or
implement a small polling loop that reads resp.Body until the heartbeat marker
":\n\n" appears (retrying for a short timeout) to reliably detect the heartbeat;
reference the Test_SSE_NewWritesHeartbeat function,
New(Config{HeartbeatInterval: ... , Handler: ...}) and
stream.Done()/stream.Err() when making the change.
- Around line 268-272: Test can hang waiting on the closed channel; replace the
direct receive "require.ErrorIs(t, <-closed, handlerErr)" with a select that
waits for either a value from the "closed" channel (asserting ErrorIs against
"handlerErr") or a time.After timeout (failing the test fast). Update imports to
include "time" and use the same timeout duration pattern already used in other
tests (e.g., Test_SSE_StreamContextCanceledOnClose) so the test fails instead of
deadlocking.
In `@middleware/sse/sse.go`:
- Around line 194-220: The stop closure returned by startHeartbeat currently
calls close(stop) which will panic if invoked more than once; modify
startHeartbeat to make the stop operation idempotent (e.g., wrap the close(stop)
call with a sync.Once or check an already-closed sentinel) so calling the
returned function multiple times is safe; update the returned closure in
startHeartbeat to use that sync.Once (or atomic/boolean guard) to ensure
close(stop) executes only once while leaving the existing ticker stop and
goroutine exit behavior unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 41cfc69f-76aa-4e51-ac59-c427aeace07c
📒 Files selected for processing (6)
docs/middleware/sse.mddocs/whats_new.mdmiddleware/sse/config.gomiddleware/sse/event.gomiddleware/sse/sse.gomiddleware/sse/sse_test.go
|
Addressed the remaining SSE review feedback in 1aacd75:
I also checked the failed CI run. Local verification:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@middleware/sse/sse.go`:
- Around line 46-74: The OnClose hook currently uses the snapshot streamErr
captured before the heartbeat goroutine has fully stopped, so late heartbeat
write errors can be missed; update the OnClose defer (inside the
SendStreamWriter closure that creates newStream and uses cfg.Handler) to
re-check the stream's latched error by calling stream.Err() when invoking
cfg.OnClose — e.g., compute finalErr := streamErr; if finalErr == nil { finalErr
= stream.Err() } then call cfg.OnClose(c, finalErr); keep the existing defer
ordering (closeStream and stopHeartbeat defers remain) and ensure this change
references streamErr, stream.Err(), cfg.OnClose and the existing
startHeartbeat/stopHeartbeat flow so the latched heartbeat error wins over the
earlier snapshot.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 954def49-bbef-4c94-bae5-dfb07f357e0f
📒 Files selected for processing (8)
ctx_interface_gen.godocs/middleware/sse.mdmiddleware/limiter/limiter_test.gomiddleware/sse/event.gomiddleware/sse/sse.gomiddleware/sse/sse_test.goreq_interface_gen.gores_interface_gen.go
✅ Files skipped from review due to trivial changes (3)
- req_interface_gen.go
- res_interface_gen.go
- ctx_interface_gen.go
🚧 Files skipped from review as they are similar to previous changes (3)
- docs/middleware/sse.md
- middleware/sse/event.go
- middleware/sse/sse_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
middleware/sse/sse.go (1)
198-227: Heartbeat goroutine is not joined on stop — residual late-error window remains.
stopHeartbeatonly closes thestopchannel; it does not wait for the goroutine to exit. With defer orderstopHeartbeat → closeStream → OnClose, the heartbeat goroutine can still be mid-s.Comment("")(or about to enter it) whileOnClosereadsstream.Err()on line 53. If the final tick's write fails and latchess.errafter that read, the disconnect error is still missed byOnClose. The mutex serializes writes, so there's no data corruption — only a narrow observability gap.Joining the goroutine before
OnCloseruns would close the gap deterministically:♻️ Optional: join the heartbeat goroutine on stop
func (s *Stream) startHeartbeat(interval time.Duration) func() { if interval <= 0 { return nil } stop := make(chan struct{}) var stopOnce sync.Once + done := make(chan struct{}) go func() { + defer close(done) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: if err := s.Comment(""); err != nil { return } case <-stop: return case <-s.Done(): return } } }() return func() { stopOnce.Do(func() { close(stop) }) + <-done } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/sse.go` around lines 198 - 227, The heartbeat goroutine started by Stream.startHeartbeat can still be executing s.Comment("") after the returned stop function (stopHeartbeat) closes stop, causing a race where OnClose may read s.Err() before the goroutine finishes and sets the final error; fix this by making the stop function block until the goroutine has fully exited: inside startHeartbeat create a done signal (or sync.WaitGroup) that the goroutine closes/signals on exit and change the returned func() (stopHeartbeat) to close stop once (using stopOnce) and then wait on that done signal before returning; keep the existing select cases (ticker.C, stop, s.Done()) and ensure the done signal is triggered in every goroutine exit path so OnClose sees the final s.Err() deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@middleware/sse/sse.go`:
- Around line 198-227: The heartbeat goroutine started by Stream.startHeartbeat
can still be executing s.Comment("") after the returned stop function
(stopHeartbeat) closes stop, causing a race where OnClose may read s.Err()
before the goroutine finishes and sets the final error; fix this by making the
stop function block until the goroutine has fully exited: inside startHeartbeat
create a done signal (or sync.WaitGroup) that the goroutine closes/signals on
exit and change the returned func() (stopHeartbeat) to close stop once (using
stopOnce) and then wait on that done signal before returning; keep the existing
select cases (ticker.C, stop, s.Done()) and ensure the done signal is triggered
in every goroutine exit path so OnClose sees the final s.Err()
deterministically.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: a7cda937-3e0e-4e42-978d-c642e89a3051
📒 Files selected for processing (2)
middleware/sse/sse.gomiddleware/sse/sse_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- middleware/sse/sse_test.go
There was a problem hiding this comment.
⚠️ Performance Alert ⚠️
Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.
| Benchmark suite | Current: a039e4a | Previous: bec9ba6 | Ratio |
|---|---|---|---|
Benchmark_Compress/Zstd (github.com/gofiber/fiber/v3/middleware/compress) - B/op |
1 B/op |
0 B/op |
+∞ |
This comment was automatically generated by workflow using github-action-benchmark.
|
Addressed the latest coverage/review feedback in 1ff8f6d:
Verified locally:
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@middleware/sse/event.go`:
- Around line 129-134: appendData currently splits normalized data into lines
and will emit an empty "data:" field when input ends with a newline; after
calling normalizeNewlines(data) trim a single trailing "\n" (not all newlines)
before splitting so you don't produce a stray empty line (apply the same
single-trailing-newline trim in writeData if writeData also processes raw data
similarly). Locate appendData and the writeData code path, perform the
single-trailing-newline removal on the normalized string, then continue
splitting and calling appendField("data", ...) as before.
- Around line 42-55: The guards check event.ID/event.Name before sanitization,
so whitespace-only values become empty after sanitizeField and produce empty SSE
fields; update the logic in the block that handles ID and Name (the code using
sanitizeField and appendField) to call sanitizeField (or strings.TrimSpace)
first and then only call appendField if the sanitized value is non-empty,
preserving existing error handling from sanitizeField; ensure you reference
event.ID/event.Name, sanitizeField, and appendField when making the change so
whitespace-only inputs no longer emit empty "id:" or "event:" SSE lines.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0f5a6fbe-3569-4bd7-b1ba-ef43c441579f
📒 Files selected for processing (3)
docs/middleware/sse.mdmiddleware/sse/event.gomiddleware/sse/sse.go
🚧 Files skipped from review as they are similar to previous changes (2)
- docs/middleware/sse.md
- middleware/sse/sse.go
|
@grivera64 @vinod-morya @gaby |
|
|
||
| ### SSE | ||
|
|
||
| Fiber now includes a small [SSE middleware](./middleware/sse.md) for Server-Sent Events. It handles native |
| case json.RawMessage: | ||
| return eventPayload{data: string(value), hasData: true}, nil | ||
| default: | ||
| encoded, err := json.Marshal(value) |
There was a problem hiding this comment.
This should use the JSON lib provided by the user in the app.Config
| } | ||
|
|
||
| func trimSingleTrailingNewline(value string) string { | ||
| return strings.TrimSuffix(value, "\n") |
| "github.com/gofiber/fiber/v3" | ||
| ) | ||
|
|
||
| const mimeTextEventStream = "text/event-stream" |
Summary
This PR adds a lightweight
middleware/ssepackage for Fiber v3.The middleware focuses on the SSE transport primitives that need to be
Fiber-native:
SendStreamWriterbased streamingLast-Event-IDaccessStream.Context()It deliberately does not include a built-in hub, topic router, replay
store, auth layer, metrics endpoint, or external pub/sub bridge. Those
features are useful, but they are application-specific and better suited
for recipes, contrib packages, or user code built around this primitive.
Background
This follows the discussion and review history from:
The main direction from review was to keep Fiber core focused on the SSE
protocol and streaming lifecycle, rather than shipping a full pub/sub
framework in middleware.
API