Skip to content

benjreinhart/server_sent_events

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ServerSentEvents

CI License Version Docs

Lightweight, ultra-fast Server Sent Event parser for Elixir.

This module parses according to the official Server Sent Events specification with a comprehensive test suite. See Behavior Boundary for more info.

Installation

The package can be installed by adding server_sent_events to your list of dependencies in mix.exs:

def deps do
  [
    {:server_sent_events, "~> 1.0.0"}
  ]
end

Usage

Decode an enumerable of binary chunks with ServerSentEvents.decode_stream/1:

events =
  [
    "event: message\n",
    "data: {\"complete\":true}\n\n"
  ]
  |> ServerSentEvents.decode_stream()
  |> Enum.to_list()

IO.inspect(events)
# [%{event: "message", data: "{\"complete\":true}"}]

The decoder keeps state across arbitrary chunk boundaries:

events =
  [
    "event: mes",
    "sage\ndata: {\"complete\":",
    "true}\n\n"
  ]
  |> ServerSentEvents.decode_stream()
  |> Enum.to_list()

IO.inspect(events)
# [%{event: "message", data: "{\"complete\":true}"}]

Events are maps that always include :data, and may also include :id, :event, or :retry. The :id, :event, and :data values are binaries. The :retry value is a non-negative integer when present.

Real world example using Req

Req can expose the response body as an enumerable with into: :self. That body can be passed through ServerSentEvents.decode_stream/1:

From there, callers typically filter event types and JSON-decode the data field.

%Req.Response{status: 200, body: response_body} =
  Req.post!("https://api.anthropic.com/v1/messages",
    json: request,
    into: :self,
    headers: %{"x-api-key" => api_key(), "anthropic-version" => "2023-06-01"}
  )

response_body
|> ServerSentEvents.decode_stream()
|> Stream.map(fn %{data: data} -> JSON.decode!(data) end)
|> Enum.each(&IO.inspect/1)
#  %{
#    "content_block" => %{"type" => "thinking", "signature" => "", "thinking" => ""},
#    "index" => 0,
#    "type" => "content_block_start"
#  }
#  %{
#    "delta" => %{"type" => "thinking_delta", "thinking" => "Now"},
#    "index" => 0,
#    "type" => "content_block_delta"
#  }
#
#  etc...

Behavior Boundary

This library decodes the event stream syntax and applies the field-level parsing rules from the specification. In particular, id fields containing NULL are ignored, and retry fields are emitted as integers only when they contain ASCII digits. Events that do not contain a data field are suppressed.

It intentionally leaves EventSource state and connection behavior to the caller, including:

  • Tracking, resetting, or applying lastEventId.
  • Applying retry delays.
  • Supplying a default event type such as "message".
  • Opening HTTP connections, reconnecting, or interpreting response headers.

This decoder also assumes the input stream is UTF-8. It does not validate UTF-8, reject malformed input, or perform replacement-character decoding.

Benchmarking

Run the local benchmark with:

mix bench

The benchmark exercises both large complete payloads and large payloads that end with an incomplete trailing event, and reports execution time and memory usage.

About

Lightweight, ultra-fast, fully spec-conformant Server Sent Event parser

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages