Lightweight, ultra-fast Server Sent Event parser for Elixir.
This module parses according to the official Server Sent Events specification with a comprehensive test suite. See Behavior Boundary for more info.
The package can be installed by adding server_sent_events to your list of dependencies in mix.exs:
def deps do
[
{:server_sent_events, "~> 1.0.0"}
]
endDecode an enumerable of binary chunks with ServerSentEvents.decode_stream/1:
events =
[
"event: message\n",
"data: {\"complete\":true}\n\n"
]
|> ServerSentEvents.decode_stream()
|> Enum.to_list()
IO.inspect(events)
# [%{event: "message", data: "{\"complete\":true}"}]The decoder keeps state across arbitrary chunk boundaries:
events =
[
"event: mes",
"sage\ndata: {\"complete\":",
"true}\n\n"
]
|> ServerSentEvents.decode_stream()
|> Enum.to_list()
IO.inspect(events)
# [%{event: "message", data: "{\"complete\":true}"}]Events are maps that always include :data, and may also include :id, :event, or :retry.
The :id, :event, and :data values are binaries. The :retry value is a non-negative
integer when present.
Req can expose the response body as an enumerable with into: :self. That body can be passed through ServerSentEvents.decode_stream/1:
From there, callers typically filter event types and JSON-decode the data field.
%Req.Response{status: 200, body: response_body} =
Req.post!("https://api.anthropic.com/v1/messages",
json: request,
into: :self,
headers: %{"x-api-key" => api_key(), "anthropic-version" => "2023-06-01"}
)
response_body
|> ServerSentEvents.decode_stream()
|> Stream.map(fn %{data: data} -> JSON.decode!(data) end)
|> Enum.each(&IO.inspect/1)
# %{
# "content_block" => %{"type" => "thinking", "signature" => "", "thinking" => ""},
# "index" => 0,
# "type" => "content_block_start"
# }
# %{
# "delta" => %{"type" => "thinking_delta", "thinking" => "Now"},
# "index" => 0,
# "type" => "content_block_delta"
# }
#
# etc...This library decodes the event stream syntax and applies the field-level parsing rules from the
specification. In particular, id fields containing NULL are ignored, and retry fields are
emitted as integers only when they contain ASCII digits. Events that do not contain a data
field are suppressed.
It intentionally leaves EventSource state and connection behavior to the caller, including:
- Tracking, resetting, or applying
lastEventId. - Applying retry delays.
- Supplying a default event type such as
"message". - Opening HTTP connections, reconnecting, or interpreting response headers.
This decoder also assumes the input stream is UTF-8. It does not validate UTF-8, reject malformed input, or perform replacement-character decoding.
Run the local benchmark with:
mix benchThe benchmark exercises both large complete payloads and large payloads that end with an incomplete trailing event, and reports execution time and memory usage.